From 6034002e2f791b0ba26504407863980b6cefdb5a Mon Sep 17 00:00:00 2001 From: Dan Scales Date: Thu, 16 Jan 2025 13:16:13 -0800 Subject: [PATCH 1/2] GTC-3055 Added new GHG (green-house gas) analysis This analysis computes the DLUC (direct land use change) GHG emissions factors for each location, based on the tree loss information for the location and the specified yield or commodity (which gives a default yield value). The emissions are calculated by a 20-year discounted formula, as described in GHGRawDataGroup.scala. We compute the emissions factors for each year in 2020-2023. The new set of GHG Analysis files are most closely modeled on the ForestChangeDiagnostic analysis files, but are simplified as much as possible. The GHGSummary file has the logic that computes the crop yield for a particular pixel location, including making use of the primary crop yield datasets or the backup yield CSV file. It also computes the emissions due to tree loss. The GHGRawDataGroup file does the emissions factor computation. Here are some of the other needed changes: - Added gross emissions datasets (already in use for some carbon flux analyses) to the Pro dataset catalog. - Added a new dataset per commodity (currently 6 commodities, will add more later) that gives the average crop yield in each 10km area. Added a generic MapspamYield layer to access these commodity datasets. - Added a CSV file with "backup" crop yields per GADM2 area, when the primary crop yield rasters don't have a value for a particular pixel. This CSV file (which is < 26 Mbytes) is broadcast to all nodes (not each task), so it is available for lookup, and avoids Spark tasks swamping the Data API with requests. The CSV file is specified by command-line option and will be placed on S3. - Added a new Feature and FeatureId "gfwpro_ext" that includes "commodity" and "yield" fields, which are needed to specify the exact crop yield or commodity grown at each location. - Changed ErrorSummaryRDD to allow passing in the featureId to the polygonalSummary code as part of kwargs. This is needed so we can pass the yield and commodity information into the GHGSummary code. We need the yield/commodity during the per-pixel analysis. - Added a GHG test. The input file for the test case (ghg.tsv) tests a variety of commodities and yields, and even the error case where a default yield for a commodity is not found at all for a location. A partial backup GADM2 yield file is includes in the test files (part_yield_spam_gadm2.csv) --- src/main/resources/raster-catalog-pro.json | 32 ++++ .../globalforestwatch/features/Feature.scala | 5 +- .../features/FeatureId.scala | 1 + .../features/GfwProFeatureExt.scala | 23 +++ .../features/GfwProFeatureExtId.scala | 7 + .../layers/MapspamYield.scala | 17 ++ .../summarystats/ErrorSummaryRDD.scala | 92 +++++---- .../summarystats/JobError.scala | 9 + .../summarystats/SummaryMain.scala | 13 +- .../summarystats/ghg/GHGAnalysis.scala | 72 +++++++ .../summarystats/ghg/GHGCommand.scala | 72 +++++++ .../summarystats/ghg/GHGDF.scala | 41 ++++ .../summarystats/ghg/GHGData.scala | 41 ++++ .../summarystats/ghg/GHGDataDouble.scala | 33 ++++ .../summarystats/ghg/GHGDataValueYearly.scala | 39 ++++ .../summarystats/ghg/GHGExport.scala | 46 +++++ .../summarystats/ghg/GHGGrid.scala | 15 ++ .../summarystats/ghg/GHGGridSources.scala | 59 ++++++ .../summarystats/ghg/GHGParser.scala | 13 ++ .../summarystats/ghg/GHGRDD.scala | 49 +++++ .../summarystats/ghg/GHGRawData.scala | 25 +++ .../summarystats/ghg/GHGRawDataGroup.scala | 37 ++++ .../summarystats/ghg/GHGSummary.scala | 177 ++++++++++++++++++ .../summarystats/ghg/GHGTile.scala | 40 ++++ .../summarystats/ghg/package.scala | 13 ++ ...19666-0fb5-4d34-9ca1-4fd602f82489-c000.csv | 8 + src/test/resources/ghg.tsv | 8 + src/test/resources/part_yield_spam_gadm2.csv | 89 +++++++++ .../summarystats/ghg/GHGAnalysisSpec.scala | 70 +++++++ 29 files changed, 1100 insertions(+), 46 deletions(-) create mode 100644 src/main/scala/org/globalforestwatch/features/GfwProFeatureExt.scala create mode 100644 src/main/scala/org/globalforestwatch/features/GfwProFeatureExtId.scala create mode 100644 src/main/scala/org/globalforestwatch/layers/MapspamYield.scala create mode 100644 src/main/scala/org/globalforestwatch/summarystats/ghg/GHGAnalysis.scala create mode 100644 src/main/scala/org/globalforestwatch/summarystats/ghg/GHGCommand.scala create mode 100644 src/main/scala/org/globalforestwatch/summarystats/ghg/GHGDF.scala create mode 100644 src/main/scala/org/globalforestwatch/summarystats/ghg/GHGData.scala create mode 100644 src/main/scala/org/globalforestwatch/summarystats/ghg/GHGDataDouble.scala create mode 100644 src/main/scala/org/globalforestwatch/summarystats/ghg/GHGDataValueYearly.scala create mode 100644 src/main/scala/org/globalforestwatch/summarystats/ghg/GHGExport.scala create mode 100644 src/main/scala/org/globalforestwatch/summarystats/ghg/GHGGrid.scala create mode 100644 src/main/scala/org/globalforestwatch/summarystats/ghg/GHGGridSources.scala create mode 100644 src/main/scala/org/globalforestwatch/summarystats/ghg/GHGParser.scala create mode 100644 src/main/scala/org/globalforestwatch/summarystats/ghg/GHGRDD.scala create mode 100644 src/main/scala/org/globalforestwatch/summarystats/ghg/GHGRawData.scala create mode 100644 src/main/scala/org/globalforestwatch/summarystats/ghg/GHGRawDataGroup.scala create mode 100644 src/main/scala/org/globalforestwatch/summarystats/ghg/GHGSummary.scala create mode 100644 src/main/scala/org/globalforestwatch/summarystats/ghg/GHGTile.scala create mode 100644 src/main/scala/org/globalforestwatch/summarystats/ghg/package.scala create mode 100644 src/test/resources/ghg-output/part-00000-a3b19666-0fb5-4d34-9ca1-4fd602f82489-c000.csv create mode 100644 src/test/resources/ghg.tsv create mode 100755 src/test/resources/part_yield_spam_gadm2.csv create mode 100644 src/test/scala/org/globalforestwatch/summarystats/ghg/GHGAnalysisSpec.scala diff --git a/src/main/resources/raster-catalog-pro.json b/src/main/resources/raster-catalog-pro.json index bf36cc7a..e36c7c1f 100755 --- a/src/main/resources/raster-catalog-pro.json +++ b/src/main/resources/raster-catalog-pro.json @@ -267,6 +267,38 @@ { "name":"col_frontera_agricola", "source_uri":"s3://gfw-data-lake/col_frontera_agricola/v2024/raster/epsg-4326/{grid_size}/{row_count}/category/gdal-geotiff/{tile_id}.tif" + }, + { + "name":"gfw_forest_flux_full_extent_gross_emissions_co2_only_biomass_soil", + "source_uri": "s3://gfw-data-lake/gfw_forest_flux_full_extent_gross_emissions_co2_only_biomass_soil/v20240402/raster/epsg-4326/{grid_size}/{row_count}/Mg_CO2_ha-1/geotiff/{tile_id}.tif" + }, + { + "name":"gfw_forest_flux_full_extent_gross_emissions_non_co2_biomass_soil", + "source_uri": "s3://gfw-data-lake/gfw_forest_flux_full_extent_gross_emissions_non_co2_biomass_soil/v20240402/raster/epsg-4326/{grid_size}/{row_count}/Mg_CO2e_ha-1/geotiff/{tile_id}.tif" + }, + { + "name":"mapspam_yield_coco", + "source_uri":"s3://gfw-data-lake/mapspam_yield_coco/v2020/raster/epsg-4326/{grid_size}/{row_count}/yield/gdal-geotiff/{tile_id}.tif" + }, + { + "name":"mapspam_yield_coff", + "source_uri":"s3://gfw-data-lake/mapspam_yield_coff/v2020/raster/epsg-4326/{grid_size}/{row_count}/yield/gdal-geotiff/{tile_id}.tif" + }, + { + "name":"mapspam_yield_oilp", + "source_uri":"s3://gfw-data-lake/mapspam_yield_oilp/v2020/raster/epsg-4326/{grid_size}/{row_count}/yield/gdal-geotiff/{tile_id}.tif" + }, + { + "name":"mapspam_yield_rubb", + "source_uri":"s3://gfw-data-lake/mapspam_yield_rubb/v2020/raster/epsg-4326/{grid_size}/{row_count}/yield/gdal-geotiff/{tile_id}.tif" + }, + { + "name":"mapspam_yield_soyb", + "source_uri":"s3://gfw-data-lake/mapspam_yield_soyb/v2020/raster/epsg-4326/{grid_size}/{row_count}/yield/gdal-geotiff/{tile_id}.tif" + }, + { + "name":"mapspam_yield_sugc", + "source_uri":"s3://gfw-data-lake/mapspam_yield_sugc/v2020/raster/epsg-4326/{grid_size}/{row_count}/yield/gdal-geotiff/{tile_id}.tif" } ] } diff --git a/src/main/scala/org/globalforestwatch/features/Feature.scala b/src/main/scala/org/globalforestwatch/features/Feature.scala index 5adc1851..15d81e7b 100644 --- a/src/main/scala/org/globalforestwatch/features/Feature.scala +++ b/src/main/scala/org/globalforestwatch/features/Feature.scala @@ -52,9 +52,10 @@ object Feature { case "modis" => FireAlertModisFeature case "burned_areas" => BurnedAreasFeature case "gfwpro" => GfwProFeature + case "gfwpro_ext" => GfwProFeatureExt case value => throw new IllegalArgumentException( - s"FeatureType must be one of 'gadm', 'wdpa', 'geostore', 'gfwpro', 'feature', 'viirs', 'modis', or 'burned_areas'. Got $value." + s"FeatureType must be one of 'gadm', 'wdpa', 'geostore', 'gfwpro', 'gfwpro_ext', 'feature', 'viirs', 'modis', or 'burned_areas'. Got $value." ) } -} \ No newline at end of file +} diff --git a/src/main/scala/org/globalforestwatch/features/FeatureId.scala b/src/main/scala/org/globalforestwatch/features/FeatureId.scala index b29a81f2..443fa6c6 100644 --- a/src/main/scala/org/globalforestwatch/features/FeatureId.scala +++ b/src/main/scala/org/globalforestwatch/features/FeatureId.scala @@ -12,6 +12,7 @@ object FeatureId { case "wdpa" => WdpaFeature.getFeatureId(values, true) case "geostore" => GeostoreFeature.getFeatureId(values, true) case "gfwpro" => GfwProFeature.getFeatureId(values, true) + case "gfwpro_ext" => GfwProFeatureExt.getFeatureId(values, true) case "burned_areas" => BurnedAreasFeature.getFeatureId(values, true) case "viirs" => FireAlertViirsFeature.getFeatureId(values, true) case "modis" => FireAlertModisFeature.getFeatureId(values, true) diff --git a/src/main/scala/org/globalforestwatch/features/GfwProFeatureExt.scala b/src/main/scala/org/globalforestwatch/features/GfwProFeatureExt.scala new file mode 100644 index 00000000..179b8191 --- /dev/null +++ b/src/main/scala/org/globalforestwatch/features/GfwProFeatureExt.scala @@ -0,0 +1,23 @@ +package org.globalforestwatch.features + +// GfwPro Feature that includes commodity and yield columns for GHG analysis. + +object GfwProFeatureExt extends Feature { + val listIdPos = 0 + val locationIdPos = 1 + val geomPos = 2 + val commodityPos = 3 + val yieldPos = 4 + + val featureIdExpr = "list_id as listId, cast(location_id as int) as locationId, commodity as commodity, yield as yield" + + def getFeatureId(i: Array[String], parsed: Boolean = false): FeatureId = { + + val listId: String = i(0) + val locationId: Int = i(1).toInt + val commodity: String = i(2) + val yieldVal: Float = i(3).toFloat + + GfwProFeatureExtId(listId, locationId, commodity, yieldVal) + } +} diff --git a/src/main/scala/org/globalforestwatch/features/GfwProFeatureExtId.scala b/src/main/scala/org/globalforestwatch/features/GfwProFeatureExtId.scala new file mode 100644 index 00000000..2af50bb5 --- /dev/null +++ b/src/main/scala/org/globalforestwatch/features/GfwProFeatureExtId.scala @@ -0,0 +1,7 @@ +package org.globalforestwatch.features + +// GfwPro FeatureId that includes commodity and yield for GHG analysis. + +case class GfwProFeatureExtId(listId: String, locationId: Int, commodity: String, yieldVal: Float) extends FeatureId { + override def toString: String = s"$listId, $locationId, $commodity, $yieldVal" +} diff --git a/src/main/scala/org/globalforestwatch/layers/MapspamYield.scala b/src/main/scala/org/globalforestwatch/layers/MapspamYield.scala new file mode 100644 index 00000000..b4838160 --- /dev/null +++ b/src/main/scala/org/globalforestwatch/layers/MapspamYield.scala @@ -0,0 +1,17 @@ +package org.globalforestwatch.layers + +import org.globalforestwatch.grids.GridTile + +/** Parameterized layer for all the various Mapspam commodity yield datasets. + * 'commodity' should be one of the four-letter uppercase Mapspam commodity codes, + * such as 'COCO' or 'OILP'. + */ +case class MapspamYield(commodity: String, gridTile: GridTile, kwargs: Map[String, Any]) + extends FloatLayer + with OptionalFLayer { + + val datasetName = s"mapspam_yield_${commodity.toLowerCase()}" + + val uri: String = + uriForGrid(gridTile, kwargs) +} diff --git a/src/main/scala/org/globalforestwatch/summarystats/ErrorSummaryRDD.scala b/src/main/scala/org/globalforestwatch/summarystats/ErrorSummaryRDD.scala index cd4e1588..b4e505c7 100644 --- a/src/main/scala/org/globalforestwatch/summarystats/ErrorSummaryRDD.scala +++ b/src/main/scala/org/globalforestwatch/summarystats/ErrorSummaryRDD.scala @@ -102,22 +102,7 @@ trait ErrorSummaryRDD extends LazyLogging with java.io.Serializable { ) } - // Split features into those that completely contain the current window - // and those that only partially contain it - val windowGeom: Extent = windowLayout.mapTransform.keyToExtent(windowKey) - val (fullWindowFeatures, partialWindowFeatures) = features.partition { - feature => - try { - feature.geom.contains(windowGeom) - } catch { - case e: org.locationtech.jts.geom.TopologyException => - // fallback if JTS can't do the intersection because of a wonky geometry, - // just skip the optimization - false - } - } - - def getSummaryForGeom(featureIds: List[FEATUREID], geom: Geometry): List[(FEATUREID, ValidatedSummary[SUMMARY])] = { + def getSummaryForGeom(geom: Geometry, mykwargs: Map[String, Any]): ValidatedSummary[SUMMARY] = { val summary: Either[JobError, PolygonalSummaryResult[SUMMARY]] = maybeRaster.flatMap { raster => Either.catchNonFatal { @@ -125,9 +110,9 @@ trait ErrorSummaryRDD extends LazyLogging with java.io.Serializable { raster, geom, ErrorSummaryRDD.rasterizeOptions, - kwargs) + mykwargs) }.left.map{ - // TODO: these should be moved into left side of PolygonalSummaryResult in GT + case NoYieldException(msg) => NoYieldError(msg) case ise: java.lang.IllegalStateException => GeometryError(s"IllegalStateException") case te: org.locationtech.jts.geom.TopologyException => @@ -140,31 +125,60 @@ trait ErrorSummaryRDD extends LazyLogging with java.io.Serializable { } // Converting to Validated so errors across partial results can be accumulated // @see https://typelevel.org/cats/datatypes/validated.html#validated-vs-either - featureIds.map { id => (id, summary.toValidated) } + summary.toValidated } - // for partial windows, we need to calculate summary for each geometry, - // since they all may have unique intersections with the window - val partialWindowResults = partialWindowFeatures.flatMap { - case feature => - getSummaryForGeom(List(feature.data), feature.geom) - } + if (kwargs.get("includeFeatureId").isDefined) { + // Include the featureId in the kwargs passed to the polygonalSummary + // code. This is to handle the case where the featureId may include + // one or more columns that are used by the analysis. In that case, + // we can't do the optimization where we share fullWindow results + // across features. + val partialSummaries: Array[(FEATUREID, ValidatedSummary[SUMMARY])] = + features.map { feature: Feature[Geometry, FEATUREID] => + val id: FEATUREID = feature.data + (id, getSummaryForGeom(feature.geom, kwargs + ("featureId" -> id))) + } + partialSummaries + } else { + // Split features into those that completely contain the current window + // and those that only partially contain it + val windowGeom: Extent = windowLayout.mapTransform.keyToExtent(windowKey) + val (fullWindowFeatures, partialWindowFeatures) = features.partition { + feature => + try { + feature.geom.contains(windowGeom) + } catch { + case e: org.locationtech.jts.geom.TopologyException => + // fallback if JTS can't do the intersection because of a wonky geometry, + // just skip the optimization + false + } + } - // if there are any full window intersections, we only need to calculate - // the summary for the window, and then tie it to each feature ID - val fullWindowIds = fullWindowFeatures.map { case feature => feature.data}.toList - //if (fullWindowIds.size >= 2) { - // println(s"Re-using results from same full tile ${windowKey} for featureIds ${fullWindowIds}") - //} - val fullWindowResults = - if (fullWindowFeatures.nonEmpty) { - getSummaryForGeom(fullWindowIds, windowGeom) - } else { - List.empty - } + // for partial windows, we need to calculate summary for each geometry, + // since they all may have unique intersections with the window + val partialWindowResults = partialWindowFeatures.map { + case feature => + (feature.data, getSummaryForGeom(feature.geom, kwargs)) + } - // combine results - partialWindowResults ++ fullWindowResults + // if there are any full window intersections, we only need to calculate + // the summary for the window, and then tie it to each feature ID + val fullWindowIds = fullWindowFeatures.map { case feature => feature.data}.toList + //if (fullWindowIds.size >= 2) { + // println(s"Re-using results from same full tile ${windowKey} for featureIds ${fullWindowIds}") + //} + val fullWindowResults = + if (fullWindowFeatures.nonEmpty) { + fullWindowIds.map { id => (id, getSummaryForGeom(windowGeom, kwargs)) } + } else { + List.empty + } + + // combine results + partialWindowResults ++ fullWindowResults + } } } diff --git a/src/main/scala/org/globalforestwatch/summarystats/JobError.scala b/src/main/scala/org/globalforestwatch/summarystats/JobError.scala index 150916eb..ed85bc3e 100644 --- a/src/main/scala/org/globalforestwatch/summarystats/JobError.scala +++ b/src/main/scala/org/globalforestwatch/summarystats/JobError.scala @@ -11,7 +11,16 @@ trait JobError case class RasterReadError(msg: String) extends JobError case class GeometryError(msg: String) extends JobError + +/* Error indicating that the location did not intersect the centroid of any + * raster pixels, hence there are no results. */ case object NoIntersectionError extends JobError + +/* Error and exception indicating that no yield could be determined for the specified + * location for use in GHG analysis */ +case class NoYieldError(msg: String) extends JobError +case class NoYieldException(msg: String) extends Exception(msg) + case class MultiError(errors: Set[String]) extends JobError { def addError(err: JobError): MultiError = MultiError(errors + err.toString) def addError(other: MultiError): MultiError = MultiError(errors ++ other.errors) diff --git a/src/main/scala/org/globalforestwatch/summarystats/SummaryMain.scala b/src/main/scala/org/globalforestwatch/summarystats/SummaryMain.scala index 9042b445..3a82c552 100644 --- a/src/main/scala/org/globalforestwatch/summarystats/SummaryMain.scala +++ b/src/main/scala/org/globalforestwatch/summarystats/SummaryMain.scala @@ -10,6 +10,7 @@ import org.globalforestwatch.summarystats.gladalerts.GladAlertsCommand.gladAlert import org.globalforestwatch.summarystats.treecoverloss.TreeCoverLossCommand.treeCoverLossCommand import org.globalforestwatch.summarystats.integrated_alerts.IntegratedAlertsCommand.integratedAlertsCommand import org.globalforestwatch.summarystats.afi.AFiCommand.afiCommand +import org.globalforestwatch.summarystats.ghg.GHGCommand.ghgCommand import com.monovore.decline._ object SummaryMain { @@ -25,16 +26,18 @@ object SummaryMain { gladAlertsCommand orElse treeCoverLossCommand orElse integratedAlertsCommand orElse - afiCommand + afiCommand orElse + ghgCommand } val command = Command(name, header, true)(main) final def main(args: Array[String]): Unit = { - // Print out environment variables (for debugging purposes) - val environmentVars = System.getenv().forEach { - case (key, value) => println(s"$key = $value") + // Print out environment variables (if needed for debugging) + if (false) { + val environmentVars = System.getenv().forEach { + case (key, value) => println(s"$key = $value") + } } - command.parse(args, sys.env) match { case Left(help) => System.err.println(help) diff --git a/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGAnalysis.scala b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGAnalysis.scala new file mode 100644 index 00000000..d87f102b --- /dev/null +++ b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGAnalysis.scala @@ -0,0 +1,72 @@ +package org.globalforestwatch.summarystats.ghg + +import cats.data.Validated.{Invalid, Valid} +import cats.implicits._ + +import geotrellis.vector.{Feature, Geometry} +import org.locationtech.jts.geom.Geometry +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SparkSession +import org.globalforestwatch.summarystats.{Location, NoIntersectionError, SummaryAnalysis, ValidatedLocation} +import org.apache.spark.storage.StorageLevel +import org.globalforestwatch.ValidatedWorkflow + +object GHGAnalysis extends SummaryAnalysis { + + val name = "ghg" + + /** Greenhouse gas analysis of input features in a TSV file. The TSV file contains + * the individual list items (location IDs >= 0) and optional merged ("dissolved") + * list geometries (location id -1). + * + * This function assumes that all features have already been split by 1x1 degree + * grid, so each location and merged list may have a single or multiple rows. + */ + def apply( + features: RDD[ValidatedLocation[Geometry]], + kwargs: Map[String, Any] + )(implicit spark: SparkSession): RDD[ValidatedLocation[GHGData]] = { + features.persist(StorageLevel.MEMORY_AND_DISK) + + try { + val partialResult: RDD[ValidatedLocation[GHGData]] = { + ValidatedWorkflow(features) + .flatMap { locationGeometries => + val locationSummaries: RDD[ValidatedLocation[GHGSummary]] = { + val tmp = locationGeometries.map { case Location(id, geom) => Feature(geom, id) } + + // This is where the main analysis happens, in ErrorSummaryRDD.apply(), + // which eventually calls into GHGSummary via runPolygonalSummary(). + GHGRDD(tmp, GHGGrid.blockTileGrid, kwargs) + } + + // For all rows that didn't get an error from the GHG analysis, do the + // transformation from GHGSummary to GHGData + ValidatedWorkflow(locationSummaries).mapValid { summaries => + summaries + .mapValues { + case summary: GHGSummary => + val data = summary.toGHGData() + data + } + } + } + .unify + .persist(StorageLevel.MEMORY_AND_DISK) + } + + // If a location has empty GHGData results, then the geometry + // must not have intersected the centroid of any pixels, so report the location + // as NoIntersectionError. + partialResult.map { + case Valid(Location(fid, data)) if data.equals(GHGData.empty) => + Invalid(Location(fid, NoIntersectionError)) + case data => data + } + } catch { + case e: StackOverflowError => + e.printStackTrace() + throw e + } + } +} diff --git a/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGCommand.scala b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGCommand.scala new file mode 100644 index 00000000..1a6dbb22 --- /dev/null +++ b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGCommand.scala @@ -0,0 +1,72 @@ +package org.globalforestwatch.summarystats.ghg + +import cats.data.NonEmptyList +import org.globalforestwatch.summarystats.SummaryCommand +import cats.implicits._ +import com.monovore.decline.Opts +import org.globalforestwatch.features._ +import com.typesafe.scalalogging.LazyLogging +import org.globalforestwatch.config.GfwConfig +import org.globalforestwatch.util.Config + +object GHGCommand extends SummaryCommand with LazyLogging { + // Current range of years to do emissions factors for. + // Update GHGYearEnd when new tree loss data becomes available. + val GHGYearStart: Int = 2020 + val GHGYearEnd: Int = 2023 + + val backupYieldOpt: Opts[NonEmptyList[String]] = Opts + .options[String]( + "backup_yield_url", + help = "URI of backup yield-by-gadm2 area file, in CSV format" + ) + + val ghgCommand: Opts[Unit] = Opts.subcommand( + name = GHGAnalysis.name, + help = "Compute greenhouse gas emissions factors for GFW Pro locations." + ) { + ( + defaultOptions, + featureFilterOptions, + backupYieldOpt + ).mapN { (default, filterOptions, backupYieldUrl) => + val kwargs = Map( + "outputUrl" -> default.outputUrl, + "noOutputPathSuffix" -> default.noOutputPathSuffix, + "overwriteOutput" -> default.overwriteOutput, + // Pin the version of gfw_integrated_alerts, so we don't make a data API request for 'latest' + "config" -> GfwConfig.get(Some(NonEmptyList.one(Config("gfw_integrated_alerts", "v20231121")))) + ) + + if (!default.splitFeatures) logger.warn("Forcing splitFeatures = true") + val featureFilter = FeatureFilter.fromOptions(default.featureType, filterOptions) + + runAnalysis { implicit spark => + println("Reading backup yield file") + // Read in the backup yield file. Then arrange to broadcast a copy to + // each node once, rather than copying into each task. The broadcast makes + // sense (as opposed to a rasterization or spatial partitioning), because the + // file is currently less than 26 megabytes. + val backupDF = spark.read + .options(Map("header" -> "true", "delimiter" -> ",", "escape" -> "\"")) + .csv(backupYieldUrl.toList: _*) + val broadcastArray = spark.sparkContext.broadcast(backupDF.collect()) + + // We use the "gfwpro_ext" feature id, which includes the extra "commodity" + // and "yield" columns provided in the input feature file. + val featureRDD = ValidatedFeatureRDD(default.featureUris, "gfwpro_ext", featureFilter, splitFeatures = true) + + // We set the option to include the featureId in the kwargs passed to the + // polygonalSummary for each feature. We need the commodity and yield columns + // in the featureId to determine the yield to use for each pixel. + val fcdRDD = GHGAnalysis( + featureRDD, + kwargs + ("backupYield" -> broadcastArray) + ("includeFeatureId" -> true) + ) + + val fcdDF = GHGDF.getFeatureDataFrame(fcdRDD, spark) + GHGExport.export(default.featureType, fcdDF, default.outputUrl, kwargs) + } + } + } +} diff --git a/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGDF.scala b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGDF.scala new file mode 100644 index 00000000..755f8f38 --- /dev/null +++ b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGDF.scala @@ -0,0 +1,41 @@ +package org.globalforestwatch.summarystats.ghg + +import cats.data.Validated.{Invalid, Valid} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.globalforestwatch.features._ +import org.globalforestwatch.summarystats.{ValidatedLocation, Location} +import org.globalforestwatch.util.Util.fieldsFromCol +import org.globalforestwatch.summarystats.SummaryDF +import org.globalforestwatch.summarystats.SummaryDF.{RowError, RowId} + +object GHGDF extends SummaryDF { + + def getFeatureDataFrame( + dataRDD: RDD[ValidatedLocation[GHGData]], + spark: SparkSession + ): DataFrame = { + import spark.implicits._ + + val rowId: FeatureId => RowId = { + case gfwproId: GfwProFeatureExtId => + RowId(gfwproId.listId, gfwproId.locationId.toString) + case id => + throw new IllegalArgumentException(s"Can't produce DataFrame for $id") + } + + dataRDD.map { + case Valid(Location(fid, data)) => + (rowId(fid), RowError.empty, data) + case Invalid(Location(fid, err)) => + (rowId(fid), RowError.fromJobError(err), GHGData.empty) + } + .toDF("id", "error", "data") + .select($"id.*" :: $"error.*" :: fieldsFromCol($"data", featureFields): _*) + } + + val featureFields = List( + "total_area", + "emissions_factor_yearly" + ) +} diff --git a/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGData.scala b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGData.scala new file mode 100644 index 00000000..2765daca --- /dev/null +++ b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGData.scala @@ -0,0 +1,41 @@ +package org.globalforestwatch.summarystats.ghg + +import cats.Semigroup + +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder + +/** Final data for each location. + */ +case class GHGData( + total_area: GHGDataDouble, + emissions_factor_yearly: GHGDataValueYearly +) { + + def merge(other: GHGData): GHGData = { + + GHGData( + total_area.merge(other.total_area), + emissions_factor_yearly.merge(other.emissions_factor_yearly) + ) + } +} + +object GHGData { + + def empty: GHGData = + GHGData( + GHGDataDouble.empty, + GHGDataValueYearly.empty, + ) + + implicit val lossDataSemigroup: Semigroup[GHGData] = + new Semigroup[GHGData] { + def combine(x: GHGData, + y: GHGData): GHGData = + x.merge(y) + } + + implicit def dataExpressionEncoder: ExpressionEncoder[GHGData] = + frameless.TypedExpressionEncoder[GHGData] + .asInstanceOf[ExpressionEncoder[GHGData]] +} diff --git a/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGDataDouble.scala b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGDataDouble.scala new file mode 100644 index 00000000..3d4797cb --- /dev/null +++ b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGDataDouble.scala @@ -0,0 +1,33 @@ +package org.globalforestwatch.summarystats.ghg +import frameless.Injection +import org.globalforestwatch.util.Implicits._ +import io.circe.syntax._ + + +case class GHGDataDouble(value: Double) extends GHGDataParser[GHGDataDouble] { + def merge( + other: GHGDataDouble + ): GHGDataDouble = { + GHGDataDouble(value + other.value) + } + + def round: Double = this.round(value) + + def toJson: String = { + this.round.asJson.noSpaces + } +} + +object GHGDataDouble { + def empty: GHGDataDouble = + GHGDataDouble(0) + + def fill(value: Double, + include: Boolean = true): GHGDataDouble = { + GHGDataDouble(value * include) + } + + implicit def injection: Injection[GHGDataDouble, String] = + Injection(_.toJson, s => GHGDataDouble(s.toDouble)) + +} diff --git a/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGDataValueYearly.scala b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGDataValueYearly.scala new file mode 100644 index 00000000..dfd12e56 --- /dev/null +++ b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGDataValueYearly.scala @@ -0,0 +1,39 @@ +package org.globalforestwatch.summarystats.ghg + +import frameless.Injection + +import scala.collection.immutable.SortedMap +import io.circe.syntax._ +import io.circe.parser.decode +import cats.kernel.Semigroup +import cats.implicits._ + +/** Field value which is a map from years to doubles. + */ +case class GHGDataValueYearly(value: SortedMap[Int, Double]) + extends GHGDataParser[GHGDataValueYearly] { + + def merge(other: GHGDataValueYearly): GHGDataValueYearly = { + GHGDataValueYearly(Semigroup[SortedMap[Int, Double]].combine(value, other.value)) + } + + def round: SortedMap[Int, Double] = this.value.map { case (key, value) => key -> this.round(value) } + + def toJson: String = { + this.round.asJson.noSpaces + } +} + +object GHGDataValueYearly { + def empty: GHGDataValueYearly = + GHGDataValueYearly( + SortedMap() + ) + + def fromString(value: String): GHGDataValueYearly = { + val sortedMap = decode[SortedMap[Int, Double]](value) + GHGDataValueYearly(sortedMap.getOrElse(SortedMap())) + } + + implicit def injection: Injection[GHGDataValueYearly, String] = Injection(_.toJson, fromString) +} diff --git a/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGExport.scala b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGExport.scala new file mode 100644 index 00000000..d31dd5f5 --- /dev/null +++ b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGExport.scala @@ -0,0 +1,46 @@ +package org.globalforestwatch.summarystats.ghg + +import org.apache.spark.sql.{DataFrame, SaveMode} +import org.globalforestwatch.summarystats.SummaryExport +import org.globalforestwatch.util.Util.getAnyMapValue + +object GHGExport extends SummaryExport { + + override val csvOptions: Map[String, String] = Map( + "header" -> "true", + "delimiter" -> "\t", + "quote" -> "\u0000", + "escape" -> "\u0000", + "quoteMode" -> "NONE", + "nullValue" -> null, + "emptyValue" -> null + ) + + override def export( + featureType: String, + summaryDF: DataFrame, + outputUrl: String, + kwargs: Map[String, Any] + ): Unit = { + val saveMode: SaveMode = + if (getAnyMapValue[Boolean](kwargs, "overwriteOutput")) + SaveMode.Overwrite + else + SaveMode.ErrorIfExists + + featureType match { + case "gfwpro" => + summaryDF + .repartition(1) + .write + .mode(saveMode) + .options(csvOptions) + .csv(path = outputUrl + "/final") + + case _ => + throw new IllegalArgumentException( + "Feature type must be 'gfwpro'" + ) + } + } +} diff --git a/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGGrid.scala b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGGrid.scala new file mode 100644 index 00000000..319547b3 --- /dev/null +++ b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGGrid.scala @@ -0,0 +1,15 @@ +package org.globalforestwatch.summarystats.ghg + +import geotrellis.vector.Extent +import org.globalforestwatch.grids.{GridTile, TenByTen30mGrid} + +object GHGGrid + extends TenByTen30mGrid[GHGGridSources] { + + val gridExtent: Extent = Extent(-180.0000, -90.0000, 180.0000, 90.0000) + + def getSources(gridTile: GridTile, + kwargs: Map[String, Any]): GHGGridSources = + GHGGridSources.getCachedSources(gridTile, kwargs) + +} diff --git a/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGGridSources.scala b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGGridSources.scala new file mode 100644 index 00000000..de5e8912 --- /dev/null +++ b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGGridSources.scala @@ -0,0 +1,59 @@ +package org.globalforestwatch.summarystats.ghg + +import geotrellis.layer.{LayoutDefinition, SpatialKey} +import geotrellis.raster.Raster +import org.globalforestwatch.grids.{GridSources, GridTile} +import org.globalforestwatch.layers._ + +/** + * @param gridTile top left corner, padded from east ex: "10N_010E" + */ +case class GHGGridSources(gridTile: GridTile, kwargs: Map[String, Any]) + extends GridSources { + + val treeCoverLoss: TreeCoverLoss = TreeCoverLoss(gridTile, kwargs) + val treeCoverDensity2000: TreeCoverDensityPercent2000 = TreeCoverDensityPercent2000(gridTile, kwargs) + val grossEmissionsCo2eNonCo2: GrossEmissionsNonCo2Co2eBiomassSoil = GrossEmissionsNonCo2Co2eBiomassSoil(gridTile, kwargs = kwargs) + val grossEmissionsCo2eCo2Only: GrossEmissionsCo2OnlyCo2BiomassSoil = GrossEmissionsCo2OnlyCo2BiomassSoil(gridTile, kwargs = kwargs) + val mapspamCOCOYield: MapspamYield = MapspamYield("COCO", gridTile, kwargs = kwargs) + val mapspamCOFFYield: MapspamYield = MapspamYield("COFF", gridTile, kwargs = kwargs) + val mapspamOILPYield: MapspamYield = MapspamYield("OILP", gridTile, kwargs = kwargs) + val mapspamRUBBYield: MapspamYield = MapspamYield("RUBB", gridTile, kwargs = kwargs) + val mapspamSOYBYield: MapspamYield = MapspamYield("SOYB", gridTile, kwargs = kwargs) + val mapspamSUGCYield: MapspamYield = MapspamYield("SUGC", gridTile, kwargs = kwargs) + val gadmAdm0: GadmAdm0 = GadmAdm0(gridTile, kwargs) + val gadmAdm1: GadmAdm1 = GadmAdm1(gridTile, kwargs) + val gadmAdm2: GadmAdm2 = GadmAdm2(gridTile, kwargs) + + def readWindow( + windowKey: SpatialKey, + windowLayout: LayoutDefinition + ): Either[Throwable, Raster[GHGTile]] = { + + val tile = GHGTile( + windowKey, windowLayout, this + ) + Right(Raster(tile, windowKey.extent(windowLayout))) + } +} + +object GHGGridSources { + + @transient + private lazy val cache = + scala.collection.concurrent.TrieMap + .empty[String, GHGGridSources] + + def getCachedSources( + gridTile: GridTile, + kwargs: Map[String, Any] + ): GHGGridSources = { + + cache.getOrElseUpdate( + gridTile.tileId, + GHGGridSources(gridTile, kwargs) + ) + + } + +} diff --git a/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGParser.scala b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGParser.scala new file mode 100644 index 00000000..8baf9ab1 --- /dev/null +++ b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGParser.scala @@ -0,0 +1,13 @@ +package org.globalforestwatch.summarystats.ghg + +trait GHGDataParser[Self <: GHGDataParser[Self]] { + val value: Any + + def merge(other: Self): Self + + def toJson: String + + protected def round(value: Double, digits: Int = 4): Double = { + Math.round(value * math.pow(10, digits)) / math.pow(10, digits) + } +} diff --git a/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGRDD.scala b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGRDD.scala new file mode 100644 index 00000000..ff32152d --- /dev/null +++ b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGRDD.scala @@ -0,0 +1,49 @@ +package org.globalforestwatch.summarystats.ghg + +import cats.implicits._ +import geotrellis.layer.{LayoutDefinition, SpatialKey} +import geotrellis.raster._ +import geotrellis.raster.rasterize.Rasterizer +import geotrellis.raster.summary.polygonal._ +import geotrellis.vector._ +import org.globalforestwatch.summarystats.ErrorSummaryRDD + +object GHGRDD extends ErrorSummaryRDD { + + type SOURCES = GHGGridSources + type SUMMARY = GHGSummary + type TILE = GHGTile + + def getSources(windowKey: SpatialKey, + windowLayout: LayoutDefinition, + kwargs: Map[String, Any]): Either[Throwable, SOURCES] = { + Either.catchNonFatal { + GHGGrid.getRasterSource( + windowKey, + windowLayout, + kwargs + ) + }.left.map { ex => logger.error("Error in GHGRDD.getSources", ex); ex} + } + + def readWindow( + rs: SOURCES, + windowKey: SpatialKey, + windowLayout: LayoutDefinition + ): Either[Throwable, Raster[TILE]] = + rs.readWindow(windowKey, windowLayout) + + def runPolygonalSummary( + raster: Raster[TILE], + geometry: Geometry, + options: Rasterizer.Options, + kwargs: Map[String, Any] + ): PolygonalSummaryResult[SUMMARY] = { + raster.polygonalSummary( + geometry, + GHGSummary.getGridVisitor(kwargs), + options = options + ) + } + +} diff --git a/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGRawData.scala b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGRawData.scala new file mode 100644 index 00000000..de635415 --- /dev/null +++ b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGRawData.scala @@ -0,0 +1,25 @@ +package org.globalforestwatch.summarystats.ghg + + +import cats.Semigroup + +/** Summary data per RawDataGroup. + * + * Note: This case class contains mutable values to do accumulation by group + * in GHGSummary.getGridVisitor. + */ +case class GHGRawData(var totalArea: Double, var emissionsCo2e: Double) { + def merge(other: GHGRawData): GHGRawData = { + GHGRawData(totalArea + other.totalArea, emissionsCo2e + other.emissionsCo2e) + } +} + +object GHGRawData { + implicit val lossDataSemigroup: Semigroup[GHGRawData] = + new Semigroup[GHGRawData] { + def combine(x: GHGRawData, + y: GHGRawData): GHGRawData = + x.merge(y) + } + +} diff --git a/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGRawDataGroup.scala b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGRawDataGroup.scala new file mode 100644 index 00000000..86ebbd2b --- /dev/null +++ b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGRawDataGroup.scala @@ -0,0 +1,37 @@ +package org.globalforestwatch.summarystats.ghg + +import scala.collection.immutable.SortedMap + +case class GHGRawDataGroup(umdTreeCoverLossYear: Int, + cropYield: Double +) { + + /** Produce a partial GHGData for the loss year, yield, emissions, and area in this + * data group */ + def toGHGData(totalArea: Double, emissionsCo2e: Double): GHGData = { + val minLossYear = GHGCommand.GHGYearStart + val maxLossYear = GHGCommand.GHGYearEnd + + // Emissions factor formula: + // + // 20-year discounted emissions for year 2020 (tonsCO2e) = + // (co2e emissions 2020 * 0.0975) + (co2e emissions 2019 * 0.0925) + (co2e emissions 2018 * 0.0875) + ... + (co2e emissions 2001 * 0.025) + // production (kg) = yield * area + // emissions factor (tonsCO2e/kg) = discounted emissions / production + + val efList = for (i <- minLossYear to maxLossYear) yield { + val diff = i - umdTreeCoverLossYear + if (diff >= 0 && diff < 20) { + (i -> ((0.0975 - diff * 0.005) * emissionsCo2e) / (cropYield * totalArea)) + } else { + (i -> 0.0) + } + } + + val r = GHGData( + total_area = GHGDataDouble.fill(totalArea), + emissions_factor_yearly = GHGDataValueYearly(SortedMap(efList.toSeq: _*)) + ) + r + } +} diff --git a/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGSummary.scala b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGSummary.scala new file mode 100644 index 00000000..4e0d21d3 --- /dev/null +++ b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGSummary.scala @@ -0,0 +1,177 @@ +package org.globalforestwatch.summarystats.ghg + +import cats.implicits._ +import geotrellis.raster._ +import geotrellis.raster.summary.GridVisitor +import org.globalforestwatch.summarystats.Summary +import org.globalforestwatch.util.Geodesy +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.Row +import org.globalforestwatch.features.GfwProFeatureExtId +import scala.collection.mutable +import org.globalforestwatch.summarystats.NoYieldException + +/** GHGRawData broken down by GHGRawDataGroup, which includes the loss year and crop yield */ +case class GHGSummary( + stats: Map[GHGRawDataGroup, + GHGRawData] = Map.empty + ) extends Summary[GHGSummary] { + + /** Combine two Maps by combining GHGRawDataGroup entries that + * have the same values. This merge function is used by + * summaryStats.summarySemigroup to define a combine operation on + * GHGSummary, which is used to combine records with the same + * FeatureId in ErrorSummaryRDD. */ + def merge( + other: GHGSummary + ): GHGSummary = { + // the stats.combine method uses the + // GHGRawData.lossDataSemigroup instance to perform per-value + // combine on the map. + GHGSummary(stats.combine(other.stats)) + } + + /** Pivot raw data to GHGData and aggregate across years */ + def toGHGData(): GHGData = { + if (stats.isEmpty) { + GHGData.empty + } else { + stats + .map { case (group, data) => group.toGHGData(data.totalArea, data.emissionsCo2e) } + .foldLeft(GHGData.empty)(_ merge _) + } + } + + def isEmpty = stats.isEmpty +} + +case class CacheKey(commodity: String, gadmId: String) + +object GHGSummary { + val backupYieldCache = mutable.HashMap[CacheKey, Float]() + + // Cell types of Raster[GHGTile] may not be the same. + def getGridVisitor( + kwargs: Map[String, Any] + ): GridVisitor[Raster[GHGTile], + GHGSummary] = + new GridVisitor[Raster[GHGTile], GHGSummary] { + private var acc: GHGSummary = + new GHGSummary() + + def result: GHGSummary = acc + + def visit(raster: Raster[GHGTile], + col: Int, + row: Int): Unit = { + + // Look up the "backup" yield based on gadm area (possibly using a cached value). + def lookupBackupYield(backupArray: Array[Row], commodity: String, gadmId: String): Float = { + val cached = backupYieldCache.get(CacheKey(commodity, gadmId)) + if (cached.isDefined) { + return cached.get + } + for (r <- backupArray) { + if (r.getAs[String]("GID_2") == gadmId && r.getAs[String]("commodity") == commodity) { + val cropYield = r.getAs[String]("yield_kg_ha").toFloat + backupYieldCache(CacheKey(commodity, gadmId)) = cropYield + println(s"Found backupyield ${cropYield} for ${commodity} in ${gadmId}") + return cropYield + } + } + println(s"No yield found for $commodity in $gadmId") + throw new NoYieldException(s"No yield found for $commodity in $gadmId") + } + + val featureId = kwargs("featureId").asInstanceOf[GfwProFeatureExtId] + + // This is a pixel by pixel operation + + // pixel Area + val re: RasterExtent = raster.rasterExtent + val lat: Double = re.gridRowToMap(row) + // uses Pixel's center coordinate. +/- raster.cellSize.height/2 + // doesn't make much of a difference + val area: Double = Geodesy.pixelArea(lat, re.cellSize) + val areaHa = area / 10000.0 + + // Only count tree loss for canopy > 30% + val tcd2000: Integer = raster.tile.tcd2000.getData(col, row) + val umdTreeCoverLossYear: Int = { + val loss = raster.tile.loss.getData(col, row) + if (loss != null && tcd2000 > 30) { + loss.toInt + } else { + 0 + } + } + + val cropYield = if (umdTreeCoverLossYear == 0) { + // If no tree loss, then there's no need to calculate yield, since there + // were no emissions. + //println("No tree loss") + 0.0 + } else if (featureId.yieldVal > 0.0) { + featureId.yieldVal + } else { + // Get default yield based on commodity + val defaultYield = featureId.commodity match { + case "COCO" => raster.tile.cocoYield.getData(col, row) + case "COFF" => raster.tile.coffYield.getData(col, row) + case "OILP" => raster.tile.oilpYield.getData(col, row) + case "RUBB" => raster.tile.rubbYield.getData(col, row) + case "SOYB" => raster.tile.soybYield.getData(col, row) + case "SUGC" => raster.tile.sugcYield.getData(col, row) + case _ => + println("Invalid commodity ${featureId.commodity}") + throw new Exception("Invalid commodity") + } + if (defaultYield != 0.0) { + //println(s"Yield ${defaultYield}, (${col}, ${row})") + defaultYield + } else { + // If we don't have a yield for this commodity based on the specific pixel, + // then do a lookup for the default yield for the entire gadm2 area this + // location is in. + val gadmAdm0: String = raster.tile.gadmAdm0.getData(col, row) + // Skip processing this pixel if gadmAdm0 is empty + if (gadmAdm0 == "") { + println("Empty gadmAdm0") + return + } + val gadmAdm1: Integer = raster.tile.gadmAdm1.getData(col, row) + val gadmAdm2: Integer = raster.tile.gadmAdm2.getData(col, row) + val gadmId: String = s"$gadmAdm0.$gadmAdm1.${gadmAdm2}_1" + //println(s"Empty ${featureId.commodity} default yield, checking gadm yield for $gadmId") + val backupArray = kwargs("backupYield").asInstanceOf[Broadcast[Array[Row]]].value + val backupYield = lookupBackupYield(backupArray, featureId.commodity, gadmId) + backupYield + } + } + + // Compute gross emissions Co2-equivalent due to tree loss at this pixel. + val grossEmissionsCo2eNonCo2: Float = raster.tile.grossEmissionsCo2eNonCo2.getData(col, row) + val grossEmissionsCo2eCo2Only: Float = raster.tile.grossEmissionsCo2eCo2Only.getData(col, row) + val grossEmissionsCo2eNonCo2Pixel = grossEmissionsCo2eNonCo2 * areaHa + val grossEmissionsCo2eCo2OnlyPixel = grossEmissionsCo2eCo2Only * areaHa + + val groupKey = GHGRawDataGroup(umdTreeCoverLossYear, cropYield) + + // if (umdTreeCoverLossYear > 0) { + // println(s"Yield $cropYield, lossYear $umdTreeCoverLossYear, area $areaHa, co2e ${grossEmissionsCo2eNonCo2Pixel + grossEmissionsCo2eCo2OnlyPixel}") + // } + val summaryData: GHGRawData = + acc.stats.getOrElse( + key = groupKey, + default = GHGRawData(0, 0) + ) + + summaryData.totalArea += areaHa + summaryData.emissionsCo2e += grossEmissionsCo2eNonCo2Pixel + grossEmissionsCo2eCo2OnlyPixel + + val new_stats = acc.stats.updated(groupKey, summaryData) + acc = GHGSummary(new_stats) + + } + } +} diff --git a/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGTile.scala b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGTile.scala new file mode 100644 index 00000000..53a59edc --- /dev/null +++ b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGTile.scala @@ -0,0 +1,40 @@ +package org.globalforestwatch.summarystats.ghg + +import geotrellis.raster.{CellGrid, CellType} +import geotrellis.layer.{LayoutDefinition, SpatialKey} + +/** + * + * Tile-like structure to hold tiles from datasets required for our summary. + * We can not use GeoTrellis MultibandTile because it requires all bands share a CellType. + */ +case class GHGTile( + windowKey: SpatialKey, + windowLayout: LayoutDefinition, + sources: GHGGridSources, +) extends CellGrid[Int] { + + lazy val loss = sources.treeCoverLoss.fetchWindow(windowKey, windowLayout) + lazy val tcd2000 = sources.treeCoverDensity2000.fetchWindow(windowKey, windowLayout) + lazy val grossEmissionsCo2eNonCo2 = sources.grossEmissionsCo2eNonCo2.fetchWindow(windowKey, windowLayout) + lazy val grossEmissionsCo2eCo2Only = sources.grossEmissionsCo2eCo2Only.fetchWindow(windowKey, windowLayout) + // It is important that the fetches for the individual yield files and the gadm + // admin areas are lazy, since we will only need access to at most one commodity + // file for any particular location. And we only need to access the gadm area + // values if we can't get a yield value from the relevant commodity file. + lazy val cocoYield = sources.mapspamCOCOYield.fetchWindow(windowKey, windowLayout) + lazy val coffYield = sources.mapspamCOFFYield.fetchWindow(windowKey, windowLayout) + lazy val oilpYield = sources.mapspamOILPYield.fetchWindow(windowKey, windowLayout) + lazy val rubbYield = sources.mapspamRUBBYield.fetchWindow(windowKey, windowLayout) + lazy val soybYield = sources.mapspamSOYBYield.fetchWindow(windowKey, windowLayout) + lazy val sugcYield = sources.mapspamSUGCYield.fetchWindow(windowKey, windowLayout) + lazy val gadmAdm0 = sources.gadmAdm0.fetchWindow(windowKey, windowLayout) + lazy val gadmAdm1 = sources.gadmAdm1.fetchWindow(windowKey, windowLayout) + lazy val gadmAdm2 = sources.gadmAdm2.fetchWindow(windowKey, windowLayout) + + def cellType: CellType = loss.cellType + + def cols: Int = loss.cols + + def rows: Int = loss.rows +} diff --git a/src/main/scala/org/globalforestwatch/summarystats/ghg/package.scala b/src/main/scala/org/globalforestwatch/summarystats/ghg/package.scala new file mode 100644 index 00000000..23cddd27 --- /dev/null +++ b/src/main/scala/org/globalforestwatch/summarystats/ghg/package.scala @@ -0,0 +1,13 @@ +package org.globalforestwatch.summarystats + +import frameless.TypedEncoder + +package object ghg { + // Uses the injection defined in the companion object of each of these types. + // See https://typelevel.org/frameless/Injection.html + implicit def dataDoubleTypedEncoder: TypedEncoder[GHGDataDouble] = + TypedEncoder.usingInjection[GHGDataDouble, String] + + implicit def dataValueYearlyTypedEncoder: TypedEncoder[GHGDataValueYearly] = + TypedEncoder.usingInjection[GHGDataValueYearly, String] +} diff --git a/src/test/resources/ghg-output/part-00000-a3b19666-0fb5-4d34-9ca1-4fd602f82489-c000.csv b/src/test/resources/ghg-output/part-00000-a3b19666-0fb5-4d34-9ca1-4fd602f82489-c000.csv new file mode 100644 index 00000000..fc7b9efc --- /dev/null +++ b/src/test/resources/ghg-output/part-00000-a3b19666-0fb5-4d34-9ca1-4fd602f82489-c000.csv @@ -0,0 +1,8 @@ +list_id location_id status_code location_error total_area emissions_factor_yearly +166 -1 2 1006.2585 {"2020":0.1011,"2021":0.1023,"2022":0.1033,"2023":0.1032} +2 1 2 19068.2043 {"2020":0.1525,"2021":0.1678,"2022":0.1756,"2023":0.184} +176 1 2 71923.9299 {"2020":0.2941,"2021":0.3003,"2022":0.3073,"2023":0.3131} +1 3 2 1.2978 {"2020":0.1177,"2021":0.1087,"2022":0.1445,"2023":0.1347} +1036 14697 2 3.826 {"2020":0.0,"2021":0.0,"2022":0.0,"2023":0.0} +166 1 2 1006.2585 {"2020":0.1011,"2021":0.1023,"2022":0.1033,"2023":0.1032} +1 4 3 ["NoYieldError(No yield found for SUGC in PER.23.5_1)"] 0.0 {} diff --git a/src/test/resources/ghg.tsv b/src/test/resources/ghg.tsv new file mode 100644 index 00000000..900ec19b --- /dev/null +++ b/src/test/resources/ghg.tsv @@ -0,0 +1,8 @@ +list_id location_id geom commodity yield +1 3 010300000001000000410000002212206B4F3F53C069EB247E6CD81CC0D3048C594F3F53C044F3BA646DD81CC08E2BE400503F53C035391C605DD81CC0DCBBC919513F53C0BBF1601E41D81CC060A5DB42523F53C0A70B024223D81CC09584A44C533F53C04DA61E2A0CD81CC05CC0B728543F53C06D5A63AFFBD71CC062FC9919543F53C02E4E8426ECD71CC03FF75521543F53C00AE0A09CECD71CC060CBB981533F53C000537AF7DED71CC05D9AD9E9523F53C02A94E521DCD71CC0B01C26BC523F53C0F6870699CCD71CC0384B1ABA513F53C0274D2078BED71CC07D24C212513F53C069E33C27B3D71CC00248B4B7503F53C02EA8606EA6D71CC0AC94874D503F53C02F1B3AC998D71CC0858441FC503F53C0184E419A8BD71CC0B1F6477D513F53C0C4A481577FD71CC0FC0BD2C6503F53C0E72744DC76D71CC078C201E34F3F53C05F283A7475D71CC01EBA57E84E3F53C01EA9819073D71CC06E0BC4224E3F53C0E458C6DC6ED71CC0448022024D3F53C096A9772872D71CC0E97778074C3F53C0926720CE77D71CC018E83D424B3F53C0C0D37E877CD71CC018E83D424B3F53C0FB25BF0B7CD71CC06F758C6D4A3F53C0E1E367B181D71CC0BBAEB3AF493F53C04AA206EF85D71CC09354F226493F53C0E59D06B691D71CC0364146D3483F53C02F1B3AC998D71CC0B2F775EF473F53C0E957A5BAA1D71CC0DF439E31473F53C0D7E5B78FACD71CC0DC12BE99463F53C01E4FA548B9D71CC0AE7C1A20463F53C0134FA548B9D71CC0F4795F71453F53C0BD1BA8DFC7D71CC0F88461CA443F53C044D63C7CD6D71CC09C581AD7433F53C030D8C14CD1D71CC00F282464433F53C0A57ABD84D5D71CC05920BE6F433F53C0E3A23F13E5D71CC0CC8F0942433F53C06D597C12F1D71CC040C9C936433F53C0E3FC5EE7FFD71CC0DC793BF2423F53C06BCF3EEC0BD81CC01688A34D433F53C05C8DE79111D81CC0DD698E53443F53C0DDA11EF117D81CC00F282464433F53C0E2D4A2D32DD81CC0069E719D433F53C02E999B9052D81CC0BBA5D791433F53C0A06B7B955ED81CC00B7A95D3433F53C0F68B819171D81CC0BA7EEFA6433F53C0B20BFC6469D81CC0BE3676E4433F53C0072B1BC471D81CC00EAB756B443F53C08F13C16080D81CC00BBD6350443F53C057D87B0D9BD81CC011F94541443F53C0785C3F83A1D81CC0DC9F1931443F53C0A27268D8C2D81CC0F2C52DD2433F53C067F0A644DAD81CC07A7ABEC9433F53C0F07B5805F0D81CC0191942A0433F53C01DFF63F301D91CC0909C59064D3F53C060DB5EAF09D91CC0D7A6E1F64C3F53C066AB608EFDD81CC06F88721B4D3F53C0D6233B13F3D81CC0E3BEE71D4D3F53C05E368551EDD81CC0ECE8DB294D3F53C02E571E28E0D81CC074A419DC4D3F53C0390B1A44B5D81CC07A23CA494E3F53C01FFC805193D81CC02212206B4F3F53C069EB247E6CD81CC0 COCO 0.0 +1 4 010300000001000000410000002212206B4F3F53C069EB247E6CD81CC0D3048C594F3F53C044F3BA646DD81CC08E2BE400503F53C035391C605DD81CC0DCBBC919513F53C0BBF1601E41D81CC060A5DB42523F53C0A70B024223D81CC09584A44C533F53C04DA61E2A0CD81CC05CC0B728543F53C06D5A63AFFBD71CC062FC9919543F53C02E4E8426ECD71CC03FF75521543F53C00AE0A09CECD71CC060CBB981533F53C000537AF7DED71CC05D9AD9E9523F53C02A94E521DCD71CC0B01C26BC523F53C0F6870699CCD71CC0384B1ABA513F53C0274D2078BED71CC07D24C212513F53C069E33C27B3D71CC00248B4B7503F53C02EA8606EA6D71CC0AC94874D503F53C02F1B3AC998D71CC0858441FC503F53C0184E419A8BD71CC0B1F6477D513F53C0C4A481577FD71CC0FC0BD2C6503F53C0E72744DC76D71CC078C201E34F3F53C05F283A7475D71CC01EBA57E84E3F53C01EA9819073D71CC06E0BC4224E3F53C0E458C6DC6ED71CC0448022024D3F53C096A9772872D71CC0E97778074C3F53C0926720CE77D71CC018E83D424B3F53C0C0D37E877CD71CC018E83D424B3F53C0FB25BF0B7CD71CC06F758C6D4A3F53C0E1E367B181D71CC0BBAEB3AF493F53C04AA206EF85D71CC09354F226493F53C0E59D06B691D71CC0364146D3483F53C02F1B3AC998D71CC0B2F775EF473F53C0E957A5BAA1D71CC0DF439E31473F53C0D7E5B78FACD71CC0DC12BE99463F53C01E4FA548B9D71CC0AE7C1A20463F53C0134FA548B9D71CC0F4795F71453F53C0BD1BA8DFC7D71CC0F88461CA443F53C044D63C7CD6D71CC09C581AD7433F53C030D8C14CD1D71CC00F282464433F53C0A57ABD84D5D71CC05920BE6F433F53C0E3A23F13E5D71CC0CC8F0942433F53C06D597C12F1D71CC040C9C936433F53C0E3FC5EE7FFD71CC0DC793BF2423F53C06BCF3EEC0BD81CC01688A34D433F53C05C8DE79111D81CC0DD698E53443F53C0DDA11EF117D81CC00F282464433F53C0E2D4A2D32DD81CC0069E719D433F53C02E999B9052D81CC0BBA5D791433F53C0A06B7B955ED81CC00B7A95D3433F53C0F68B819171D81CC0BA7EEFA6433F53C0B20BFC6469D81CC0BE3676E4433F53C0072B1BC471D81CC00EAB756B443F53C08F13C16080D81CC00BBD6350443F53C057D87B0D9BD81CC011F94541443F53C0785C3F83A1D81CC0DC9F1931443F53C0A27268D8C2D81CC0F2C52DD2433F53C067F0A644DAD81CC07A7ABEC9433F53C0F07B5805F0D81CC0191942A0433F53C01DFF63F301D91CC0909C59064D3F53C060DB5EAF09D91CC0D7A6E1F64C3F53C066AB608EFDD81CC06F88721B4D3F53C0D6233B13F3D81CC0E3BEE71D4D3F53C05E368551EDD81CC0ECE8DB294D3F53C02E571E28E0D81CC074A419DC4D3F53C0390B1A44B5D81CC07A23CA494E3F53C01FFC805193D81CC02212206B4F3F53C069EB247E6CD81CC0 SUGC 0.0 +2 1 010300000001000000050000009C62A16D184C50C0B84FA8D2121338C065281B76384C50C08412DA29C32938C02245551C564350C0A4D6C768383138C072E6B894554150C058DCDCDFDF0F38C09C62A16D184C50C0B84FA8D2121338C0 SUGC 0.0 +166 1 01030000000100000005000000e7aed1e4156e5840d583f1ef0d6706405123d124216e5840bb5dcfdae43506403dbcd0243d70584057ee7202b43806406a50d1e404705840f75140ece2700640e7aed1e4156e5840d583f1ef0d670640 OILP 0.0 +166 -1 01030000000100000005000000e7aed1e4156e5840d583f1ef0d6706405123d124216e5840bb5dcfdae43506403dbcd0243d70584057ee7202b43806406a50d1e404705840f75140ece2700640e7aed1e4156e5840d583f1ef0d670640 OILP 0.0 +1036 14697 0103000000010000007B0000002387889B534914C09128B4ACFB071840CD902A8A574914C09702D2FE07081840A5828AAA5F4914C0B3D0CE69160818409A081B9E5E4914C041800C1D3B08184023A46E675F4914C09C508880430818406C3D4338664914C05DA5BBEB6C081840508C2C99634914C0524832AB770818408F8EAB915D4914C00723F609A0081840172AFF5A5E4914C05D5320B3B3081840A08B868C474914C079043752B6081840CEFFAB8E1C4914C002A08A1BB708184012BF620D174914C0F6251B0FB6081840F0164850FC4814C0BDE0D39CBC081840A0C03BF9F44814C04B395FECBD081840CE8B135FED4814C06EA7AD11C10818402905DD5ED24814C06EA7AD11C1081840BDFDB968C84814C07364E597C10818403BC8EBC1A44814C0C39D0B23BD0818409624CFF57D4814C051F69672BE0818404C6E14596B4814C018B14F00C50818407A1C06F3574814C0849B8C2AC308184096B036C64E4814C01E51A1BAB9081840D595CFF23C4814C0C9207711A608184096766A2E374814C079CA6ABA9E081840583A1F9E254814C0F111312592081840410C74ED0B4814C09067976F7D081840A2427573F14714C0C98FF8156B0818401F477364E54714C058CB9D9960081840A2258FA7E54714C09010E50B5A08184091EEE714E44714C0D5CF9B8A54081840B33F506EDB4714C096B036C64E0818405E2C0C91D34714C05D4E098849081840807D74EACA4714C07AE2395B40081840200DA7CCCD4714C06F2EFEB6270818406F46CD57C94714C02578431A15081840C53C2B69C54714C003D0285DFA0718408600E0D8B34714C03CF88903E8071840BF45274BAD4714C0F8FE06EDD50718402B306475AB4714C020D3DA34B607184009DFFB1BB44714C0BA4E232D95071840533E0455A34714C0BFD18E1B7E07184004E8F7FD9B4714C05F44DB31750718408C834BC79C4714C0D1CE691668071840D0420246974714C0322251685907184098E0D407924714C00F971C774A0718407B2FBE688F4714C0AFEC82C135071840530438BD8B4714C05F96766A2E0718408195438B6C4714C0E29178793A071840CB113290674714C0A94C3107410718403DB9A640664714C02C2B4D4A41071840C0401020434714C065AA605452071840E29178793A4714C020EBA9D557071840CB80B3942C4714C0AF264F594D0718402CF180B2294714C0107A36AB3E07184015FDA199274714C05AF624B0390718408D614ED0264714C0541C075E2D071840C0CC77F0134714C093E4B9BE0F071840AF95D05D124714C038143E5B070718408D614ED0264714C054A86E2EFE061840F96871C6304714C076F9D687F5061840D15AD1E6384714C049111956F106184076FEEDB25F4714C00BD5CDC5DF06184042B0AA5E7E4714C016325706D5061840530438BD8B4714C071E5EC9DD10618403C4A253CA14714C082FFAD64C70618405ED5592DB04714C0B587BD50C0061840809A5AB6D64714C0B0AD9FFEB3061840361E6CB1DB4714C0111E6D1CB1061840F855B950F94714C0A5164A26A70618406FD74B53044814C082A8FB00A4061840B3B3E89D0A4814C04489963C9E0618404DA3C9C5184814C08E05854199061840083BC5AA414814C0333509DE90061840C93846B2474814C077F4BF5C8B061840FDFA2136584814C0994528B682061840D5EC8156604814C08351499D8006184046EBA86A824814C07D772B4B74061840A7785C548B4814C03F58C6866E0618400249D8B7934814C02D211FF46C061840B20FB22C984814C09A0B5C1E6B061840E0F76F5E9C4814C077BAF3C4730618408A01124DA04814C011AAD4EC81061840BE892139994814C0A5DC7D8E8F061840D4601A868F4814C04A29E8F6920618409BFEEC478A4814C0A43330F2B2061840DA006C40844814C0E90FCD3CB9061840639CBF09854814C02272FA7ABE0618407F4DD6A8874814C05AF10D85CF061840249A40118B4814C0E38C614ED0061840139D6516A14814C0E8667FA0DC0618405D19541B9C4814C0C075C58CF00618400266BE839F4814C08D0A9C6C0307184079E75086AA4814C0CC29013109071840F6251B0FB64814C0AABBB20B06071840ACA92C0ABB4814C0E200FA7DFF061840F65FE7A6CD4814C0C632FD12F106184001DA56B3CE4814C0AA81E673EE0618402448A5D8D14814C0994A3FE1EC061840E57FF277EF4814C00AF2B391EB061840A0C03BF9F44814C0F4FDD478E9061840406D54A7034914C065A54929E80618406D72F8A4134914C0F9BA0CFFE906184089230F44164914C021E692AAED061840FBE769C0204914C0F41ABB44F5061840FBE769C0204914C0AF5B04C6FA06184051DEC7D11C4914C093C7D3F203071840BDC804FC1A4914C0BB0F406A1307184067D2A6EA1E4914C0384E0AF31E071840E4F38AA71E4914C070B03731240718407EC68503214914C0A912656F29071840A034D428244914C03788D68A360718408940F50F224914C07C8159A1480718406D8FDE701F4914C026C5C7276407184001A5A146214914C0988922A46E071840ABAE4335254914C0876F61DD780718400C3CF71E2E4914C04E4700378B0718408F1A13622E4914C086A92D7590071840B1886187314914C004E8F7FD9B071840459E245D334914C075AC527AA6071840CE397826344914C0147651F4C0071840FB213658384914C097715303CD07184045BB0A293F4914C091D10149D80718402387889B534914C09128B4ACFB071840 RUBB 0.0 +176 1 010300000001000000050000000ccc88643be948c0c807b0ae5b9621c00ccc88643be948c0984e5d3c8a0422c0d8bc094710c648c0984e5d3c8a0422c0d8bc094710c648c0c807b0ae5b9621c00ccc88643be948c0c807b0ae5b9621c0 NONE 1500 diff --git a/src/test/resources/part_yield_spam_gadm2.csv b/src/test/resources/part_yield_spam_gadm2.csv new file mode 100755 index 00000000..0820ef27 --- /dev/null +++ b/src/test/resources/part_yield_spam_gadm2.csv @@ -0,0 +1,89 @@ +commodity,GID_0,NAME_0,GID_1,NAME_1,GID_2,NAME_2,yield_kg_ha +SUGC,ARG,Argentina,ARG.1_1,Buenos Aires,ARG.1.90_1,Pergamino,19854.4 +SUGC,ARG,Argentina,ARG.10_1,Jujuy,ARG.10.1_1,Capital,29188.240474933697 +SUGC,ARG,Argentina,ARG.10_1,Jujuy,ARG.10.10_1,Santa Bárbara,46932.46539336436 +SUGC,ARG,Argentina,ARG.10_1,Jujuy,ARG.10.11_1,Santa Catalina,49805.38116139515 +SUGC,ARG,Argentina,ARG.10_1,Jujuy,ARG.10.12_1,Susques,49805.30000000001 +SUGC,ARG,Argentina,ARG.10_1,Jujuy,ARG.10.13_1,Tilcara,46280.71257289307 +SUGC,ARG,Argentina,ARG.10_1,Jujuy,ARG.10.14_1,Tumbaya,47757.15423375592 +SUGC,ARG,Argentina,ARG.10_1,Jujuy,ARG.10.15_1,Valle Grande,54881.80637518701 +SUGC,ARG,Argentina,ARG.10_1,Jujuy,ARG.10.16_1,Yavi,49805.38753800312 +SUGC,ARG,Argentina,ARG.10_1,Jujuy,ARG.10.2_1,Cochinoca,49805.34041967665 +SUGC,ARG,Argentina,ARG.10_1,Jujuy,ARG.10.3_1,El Carmen,48619.401160462075 +SUGC,ARG,Argentina,ARG.10_1,Jujuy,ARG.10.4_1,Humahuaca,49172.34633654877 +SUGC,ARG,Argentina,ARG.10_1,Jujuy,ARG.10.5_1,Ledesma,53482.789906191734 +SUGC,ARG,Argentina,ARG.10_1,Jujuy,ARG.10.6_1,Palpalá,43750.556377484485 +SUGC,ARG,Argentina,ARG.10_1,Jujuy,ARG.10.7_1,Rinconada,49805.060473139485 +SUGC,ARG,Argentina,ARG.10_1,Jujuy,ARG.10.8_1,San Antonio,42561.82008420683 +SUGC,ARG,Argentina,ARG.10_1,Jujuy,ARG.10.9_1,San Pedro,52069.72944720751 +SUGC,ARG,Argentina,ARG.14_1,Misiones,ARG.14.1_1,Apóstoles,45788.14242871358 +SUGC,ARG,Argentina,ARG.14_1,Misiones,ARG.14.10_1,Leandro N. Alem,52866.873742786476 +SUGC,ARG,Argentina,ARG.14_1,Misiones,ARG.14.11_1,Libertador General San Martín,71384.67925531915 +SUGC,ARG,Argentina,ARG.14_1,Misiones,ARG.14.12_1,Montecarlo,54099.663654223965 +SUGC,ARG,Argentina,ARG.14_1,Misiones,ARG.14.13_1,Oberá,58340.65048167971 +SUGC,ARG,Argentina,ARG.14_1,Misiones,ARG.14.15_1,San Javier,49894.30467494522 +SUGC,ARG,Argentina,ARG.14_1,Misiones,ARG.14.16_1,San Pedro,51524.88748526986 +SUGC,ARG,Argentina,ARG.14_1,Misiones,ARG.14.17_1,Veinticinco de Mayo,70334.71265822784 +SUGC,ARG,Argentina,ARG.14_1,Misiones,ARG.14.2_1,Cainguás,47314.990655947 +SUGC,ARG,Argentina,ARG.14_1,Misiones,ARG.14.3_1,Candelaria,119126.70000000001 +SUGC,ARG,Argentina,ARG.14_1,Misiones,ARG.14.5_1,Concepción,45712.78894142208 +SUGC,ARG,Argentina,ARG.14_1,Misiones,ARG.14.8_1,Guaraní,45449.327623361576 +SUGC,ARG,Argentina,ARG.17_1,Salta,ARG.17.1_1,Anta,37446.81040222355 +SUGC,ARG,Argentina,ARG.17_1,Salta,ARG.17.10_1,Iruya,49805.3 +SUGC,ARG,Argentina,ARG.17_1,Salta,ARG.17.12_1,La Candelaria,22359.937331334335 +SUGC,ARG,Argentina,ARG.17_1,Salta,ARG.17.14_1,La Viña,131465.8 +SUGC,ARG,Argentina,ARG.17_1,Salta,ARG.17.16_1,Metán,25897.574188690243 +SUGC,ARG,Argentina,ARG.17_1,Salta,ARG.17.18_1,Orán,97793.52149949195 +SUGC,ARG,Argentina,ARG.17_1,Salta,ARG.17.20_1,Rosario de la Frontera,23338.872117794486 +SUGC,ARG,Argentina,ARG.17_1,Salta,ARG.17.21_1,Rosario de Lerma,108360.55736706489 +SUGC,ARG,Argentina,ARG.17_1,Salta,ARG.17.5_1,Cerrillos,103217.25899204242 +SUGC,ARG,Argentina,ARG.17_1,Salta,ARG.17.6_1,Chicoana,130664.8 +SUGC,ARG,Argentina,ARG.17_1,Salta,ARG.17.7_1,General Güemes,91525.68784473951 +SUGC,ARG,Argentina,ARG.17_1,Salta,ARG.17.8_1,General José de San Martín,22361.624398546825 +SUGC,ARG,Argentina,ARG.21_1,Santa Fe,ARG.21.1_1,Belgrano,19854.399999999998 +SUGC,ARG,Argentina,ARG.21_1,Santa Fe,ARG.21.10_1,Las Colonias,19854.4 +SUGC,ARG,Argentina,ARG.21_1,Santa Fe,ARG.21.11_1,Nueve de Julio,19854.4 +SUGC,ARG,Argentina,ARG.21_1,Santa Fe,ARG.21.12_1,Rosario,78817.94127054642 +SUGC,ARG,Argentina,ARG.21_1,Santa Fe,ARG.21.13_1,San Cristóbal,19854.4 +SUGC,ARG,Argentina,ARG.21_1,Santa Fe,ARG.21.15_1,San Jerónimo,117996.28643894661 +SUGC,ARG,Argentina,ARG.21_1,Santa Fe,ARG.21.16_1,San Justo,19854.4 +SUGC,ARG,Argentina,ARG.21_1,Santa Fe,ARG.21.17_1,San Lorenzo,19854.4 +SUGC,ARG,Argentina,ARG.21_1,Santa Fe,ARG.21.18_1,San Martín,19854.4 +SUGC,ARG,Argentina,ARG.21_1,Santa Fe,ARG.21.19_1,Vera,19854.4 +SUGC,ARG,Argentina,ARG.21_1,Santa Fe,ARG.21.2_1,Caseros,53331.14668011854 +SUGC,ARG,Argentina,ARG.21_1,Santa Fe,ARG.21.3_1,Castellanos,19854.4 +SUGC,ARG,Argentina,ARG.21_1,Santa Fe,ARG.21.4_1,Constitución,75127.47022117091 +SUGC,ARG,Argentina,ARG.21_1,Santa Fe,ARG.21.6_1,General López,97098.16878587517 +SUGC,ARG,Argentina,ARG.21_1,Santa Fe,ARG.21.7_1,General Obligado,19854.399999999998 +SUGC,ARG,Argentina,ARG.21_1,Santa Fe,ARG.21.8_1,Iriondo,19854.4 +SUGC,ARG,Argentina,ARG.22_1,Santiago del Estero,ARG.22.22_1,Rivadavia,19854.4 +SUGC,ARG,Argentina,ARG.3_1,Chaco,ARG.3.2_1,Bermejo,19873.6 +SUGC,ARG,Argentina,ARG.6_1,Córdoba,ARG.6.20_1,San Justo,19854.4 +SUGC,ARG,Argentina,ARG.6_1,Córdoba,ARG.6.9_1,Marcos Juárez,19854.4 +SUGC,ARG,Argentina,ARG.7_1,Corrientes,ARG.7.1_1,Bella Vista,53558.95686274509 +SUGC,ARG,Argentina,ARG.7_1,Corrientes,ARG.7.11_1,Itatí,39731.99487179487 +SUGC,ARG,Argentina,ARG.7_1,Corrientes,ARG.7.13_1,Lavalle,44892.87526881721 +SUGC,ARG,Argentina,ARG.7_1,Corrientes,ARG.7.15_1,Mercedes,51588.797826086964 +SUGC,ARG,Argentina,ARG.7_1,Corrientes,ARG.7.16_1,Monte Caseros,45615.51245421246 +SUGC,ARG,Argentina,ARG.7_1,Corrientes,ARG.7.17_1,Paso de los Libres,52641.50293577982 +SUGC,ARG,Argentina,ARG.7_1,Corrientes,ARG.7.18_1,Saladas,53520.56595744681 +SUGC,ARG,Argentina,ARG.7_1,Corrientes,ARG.7.2_1,Berón de Astrada,50609.59548022599 +SUGC,ARG,Argentina,ARG.7_1,Corrientes,ARG.7.21_1,San Martín,41516.92402234637 +SUGC,ARG,Argentina,ARG.7_1,Corrientes,ARG.7.22_1,San Miguel,23062.8 +SUGC,ARG,Argentina,ARG.7_1,Corrientes,ARG.7.23_1,San Roque,19749.3 +SUGC,ARG,Argentina,ARG.7_1,Corrientes,ARG.7.24_1,Santo Tomé,48128.03928571429 +SUGC,ARG,Argentina,ARG.7_1,Corrientes,ARG.7.25_1,Sauce,33922.464545454546 +SUGC,ARG,Argentina,ARG.7_1,Corrientes,ARG.7.4_1,Concepción,53639.2 +SUGC,ARG,Argentina,ARG.7_1,Corrientes,ARG.7.5_1,Curuzú Cuatiá,46239.30101351351 +SUGC,ARG,Argentina,ARG.7_1,Corrientes,ARG.7.6_1,Empedrado,53217.7 +SUGC,ARG,Argentina,ARG.7_1,Corrientes,ARG.7.7_1,Esquina,19749.299999999996 +SUGC,ARG,Argentina,ARG.7_1,Corrientes,ARG.7.8_1,General Alvear,53628.8 +SUGC,ARG,Argentina,ARG.7_1,Corrientes,ARG.7.9_1,General Paz,53633.15 +SUGC,ARG,Argentina,ARG.9_1,Formosa,ARG.9.1_1,Bermejo,80698.01644444444 +SUGC,ARG,Argentina,ARG.9_1,Formosa,ARG.9.2_1,Formosa,19873.6 +SUGC,ARG,Argentina,ARG.9_1,Formosa,ARG.9.3_1,Laishi,105994.1810298103 +SUGC,ARG,Argentina,ARG.9_1,Formosa,ARG.9.5_1,Patiño,41805.03956834532 +SUGC,ARG,Argentina,ARG.9_1,Formosa,ARG.9.6_1,Pilagás,19948.938461538462 +SUGC,ARG,Argentina,ARG.9_1,Formosa,ARG.9.7_1,Pilcomayo,46621.481286549715 +SUGC,ARG,Argentina,ARG.9_1,Formosa,ARG.9.8_1,Pirané,19873.6 +SUGC,ARG,Argentina,ARG.9_1,Formosa,ARG.9.9_1,Ramón Lista,46573.39598853869 diff --git a/src/test/scala/org/globalforestwatch/summarystats/ghg/GHGAnalysisSpec.scala b/src/test/scala/org/globalforestwatch/summarystats/ghg/GHGAnalysisSpec.scala new file mode 100644 index 00000000..bc6ec4af --- /dev/null +++ b/src/test/scala/org/globalforestwatch/summarystats/ghg/GHGAnalysisSpec.scala @@ -0,0 +1,70 @@ +package org.globalforestwatch.summarystats.ghg + +import cats.data.NonEmptyList +import com.github.mrpowers.spark.fast.tests.DataFrameComparer +import geotrellis.vector._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, SaveMode,Row} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.IntegerType +import org.globalforestwatch.features.{FeatureFilter, ValidatedFeatureRDD} +import org.globalforestwatch.summarystats.ValidatedLocation +import org.globalforestwatch.{TestEnvironment, ProTag} +import org.globalforestwatch.config.GfwConfig +import org.apache.spark.broadcast.Broadcast + +class GHGAnalysisSpec extends TestEnvironment with DataFrameComparer { + def ghgInputTsvPath = getClass.getResource("/ghg.tsv").toString() + def gadm2YieldPath = getClass.getResource("/part_yield_spam_gadm2.csv").toString() + def ghgExpectedOutputPath = getClass.getResource("/ghg-output").toString() + + def Ghg(features: RDD[ValidatedLocation[Geometry]], broadcastArray: Broadcast[Array[Row]]) = { + GHGAnalysis( + features, + kwargs = Map( + "config" -> GfwConfig.get(), + "backupYield" -> broadcastArray, + "includeFeatureId" -> true) + ) + } + + /** Function to update expected results when this test becomes invalid */ + def saveExpectedFcdResult(fcd: DataFrame): Unit = { + fcd.repartition(1) + .write + .mode(SaveMode.Overwrite) + .options(GHGExport.csvOptions) + .csv(path = ghgExpectedOutputPath) + } + + def readExpectedFcdResult = { + val csvFile = spark.read + .options(GHGExport.csvOptions) + .csv(ghgExpectedOutputPath) + // status_code gets interpreted as string type, so cast + // it to its correct integer type. + csvFile.withColumn("status_code", col("status_code").cast(IntegerType)) + } + + it("matches recorded output for various locations and commodities", ProTag) { + val ghgFeatures = ValidatedFeatureRDD( + NonEmptyList.one(ghgInputTsvPath), + "gfwpro_ext", + FeatureFilter.empty, + splitFeatures = true + ) + val backupDF = spark.read + .options(Map("header" -> "true", "delimiter" -> ",", "escape" -> "\"")) + .csv(gadm2YieldPath) + val broadcastArray = spark.sparkContext.broadcast(backupDF.collect()) + val fcd = Ghg(ghgFeatures, broadcastArray) + val summaryDF = GHGDF.getFeatureDataFrame(fcd, spark) + summaryDF.collect().foreach(println) + //saveExpectedFcdResult(summaryDF) + + val expectedDF = readExpectedFcdResult + + assertSmallDataFrameEquality(summaryDF, expectedDF, orderedComparison = false) + } +} + From 6856c00a4cee5fa8af770693060794c5a3ce3568 Mon Sep 17 00:00:00 2001 From: Dan Scales Date: Fri, 24 Jan 2025 15:10:05 -0800 Subject: [PATCH 2/2] Changes from Justin's review comments. --- .../summarystats/ErrorSummaryRDD.scala | 3 --- .../summarystats/ghg/GHGRawDataGroup.scala | 7 +++++-- .../summarystats/ghg/GHGSummary.scala | 12 ++++-------- 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/src/main/scala/org/globalforestwatch/summarystats/ErrorSummaryRDD.scala b/src/main/scala/org/globalforestwatch/summarystats/ErrorSummaryRDD.scala index b4e505c7..4c63202e 100644 --- a/src/main/scala/org/globalforestwatch/summarystats/ErrorSummaryRDD.scala +++ b/src/main/scala/org/globalforestwatch/summarystats/ErrorSummaryRDD.scala @@ -166,9 +166,6 @@ trait ErrorSummaryRDD extends LazyLogging with java.io.Serializable { // if there are any full window intersections, we only need to calculate // the summary for the window, and then tie it to each feature ID val fullWindowIds = fullWindowFeatures.map { case feature => feature.data}.toList - //if (fullWindowIds.size >= 2) { - // println(s"Re-using results from same full tile ${windowKey} for featureIds ${fullWindowIds}") - //} val fullWindowResults = if (fullWindowFeatures.nonEmpty) { fullWindowIds.map { id => (id, getSummaryForGeom(windowGeom, kwargs)) } diff --git a/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGRawDataGroup.scala b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGRawDataGroup.scala index 86ebbd2b..332ed060 100644 --- a/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGRawDataGroup.scala +++ b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGRawDataGroup.scala @@ -5,6 +5,9 @@ import scala.collection.immutable.SortedMap case class GHGRawDataGroup(umdTreeCoverLossYear: Int, cropYield: Double ) { + val DiscountNumberOfYears = 20 + val DiscountStartValue = 0.0975 + val DiscountDecreasePerYear = 0.005 /** Produce a partial GHGData for the loss year, yield, emissions, and area in this * data group */ @@ -21,8 +24,8 @@ case class GHGRawDataGroup(umdTreeCoverLossYear: Int, val efList = for (i <- minLossYear to maxLossYear) yield { val diff = i - umdTreeCoverLossYear - if (diff >= 0 && diff < 20) { - (i -> ((0.0975 - diff * 0.005) * emissionsCo2e) / (cropYield * totalArea)) + if (diff >= 0 && diff < DiscountNumberOfYears) { + (i -> ((DiscountStartValue - diff * DiscountDecreasePerYear) * emissionsCo2e) / (cropYield * totalArea)) } else { (i -> 0.0) } diff --git a/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGSummary.scala b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGSummary.scala index 4e0d21d3..b214acad 100644 --- a/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGSummary.scala +++ b/src/main/scala/org/globalforestwatch/summarystats/ghg/GHGSummary.scala @@ -66,12 +66,12 @@ object GHGSummary { row: Int): Unit = { // Look up the "backup" yield based on gadm area (possibly using a cached value). - def lookupBackupYield(backupArray: Array[Row], commodity: String, gadmId: String): Float = { + def lookupBackupYield(backupYieldArray: Array[Row], commodity: String, gadmId: String): Float = { val cached = backupYieldCache.get(CacheKey(commodity, gadmId)) if (cached.isDefined) { return cached.get } - for (r <- backupArray) { + for (r <- backupYieldArray) { if (r.getAs[String]("GID_2") == gadmId && r.getAs[String]("commodity") == commodity) { val cropYield = r.getAs[String]("yield_kg_ha").toFloat backupYieldCache(CacheKey(commodity, gadmId)) = cropYield @@ -142,9 +142,8 @@ object GHGSummary { val gadmAdm1: Integer = raster.tile.gadmAdm1.getData(col, row) val gadmAdm2: Integer = raster.tile.gadmAdm2.getData(col, row) val gadmId: String = s"$gadmAdm0.$gadmAdm1.${gadmAdm2}_1" - //println(s"Empty ${featureId.commodity} default yield, checking gadm yield for $gadmId") - val backupArray = kwargs("backupYield").asInstanceOf[Broadcast[Array[Row]]].value - val backupYield = lookupBackupYield(backupArray, featureId.commodity, gadmId) + val backupYieldArray = kwargs("backupYield").asInstanceOf[Broadcast[Array[Row]]].value + val backupYield = lookupBackupYield(backupYieldArray, featureId.commodity, gadmId) backupYield } } @@ -157,9 +156,6 @@ object GHGSummary { val groupKey = GHGRawDataGroup(umdTreeCoverLossYear, cropYield) - // if (umdTreeCoverLossYear > 0) { - // println(s"Yield $cropYield, lossYear $umdTreeCoverLossYear, area $areaHa, co2e ${grossEmissionsCo2eNonCo2Pixel + grossEmissionsCo2eCo2OnlyPixel}") - // } val summaryData: GHGRawData = acc.stats.getOrElse( key = groupKey,