Skip to content

Commit

Permalink
Merge pull request #30 from geotrellis/feature/awf/image-pyramid
Browse files Browse the repository at this point in the history
Render raster image pyramid for forgotten pop
  • Loading branch information
CloudNiner authored Oct 30, 2019
2 parents 3952619 + d4bcea7 commit 66baeab
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 83 deletions.
8 changes: 5 additions & 3 deletions src/main/scala/geotrellis/sdg/OutputProperties.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,23 @@ case class OutputProperties(
pop_urban: Double,
pop_rural: Double,
pop_served: Double,
pct_served: Double
pct_served: Double,
breaks: Array[Double]
)

object OutputProperties {
implicit val fooDecoder: Decoder[OutputProperties] = deriveDecoder[OutputProperties]
implicit val fooEncoder: Encoder[OutputProperties] = deriveEncoder[OutputProperties]

def apply(country: Country, summary: PopulationSummary): OutputProperties =
def apply(country: Country, summary: PopulationSummary, breaks: Array[Double]): OutputProperties =
new OutputProperties(
code = country.code,
name = country.name,
pop = summary.total,
pop_urban = summary.urban,
pop_rural = summary.rural,
pop_served = summary.ruralServed,
pct_served = (summary.ruralServed / summary.rural)
pct_served = (summary.ruralServed / summary.rural),
breaks = breaks
)
}
91 changes: 83 additions & 8 deletions src/main/scala/geotrellis/sdg/OutputPyramid.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package geotrellis.sdg

import java.net.URI

import com.uber.h3core.H3Core
import geotrellis.spark.pyramid.Pyramid
import geotrellis.spark._
import geotrellis.raster._
Expand All @@ -10,17 +11,30 @@ import geotrellis.proj4.WebMercator
import geotrellis.raster.resample.Sum
import geotrellis.spark.store.LayerWriter
import geotrellis.spark.store.hadoop.SaveToHadoop
import geotrellis.spark.store.s3.SaveToS3
import geotrellis.store.{AttributeStore, LayerId}
import geotrellis.store.index.ZCurveKeyIndexMethod
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import software.amazon.awssdk.services.s3.model.ObjectCannedACL
import vectorpipe.VectorPipe


object OutputPyramid {

/**
* Generate an image pyramid from the passed TileLayerRDD using the provided ColorRamp
* to construct a ColorMap with linear breaks
*
* @param layer
* @param colorMap
* @param outputPath
* @return
*/
def savePng(
layer: TileLayerRDD[SpatialKey],
histogram: StreamingHistogram,
colorMap: ColorMap,
outputPath: String
): Unit = {
implicit val sc: SparkContext = layer.sparkContext
Expand All @@ -29,15 +43,17 @@ object OutputPyramid {
val pyramid = Pyramid.fromLayerRDD(baseLayer, Some(baseZoom), Some(0))

pyramid.levels.foreach { case (zoom, tileRdd) =>
val colorRamp = ColorRamps.Viridis
//val Some((min, max)) = histogram.minMaxValues()
//val breaks = for { break <- (0 to 64)} yield min + (break * (max-min) / 64)
val colorMap = colorRamp.toColorMap(histogram)

val imageRdd: RDD[(SpatialKey, Array[Byte])] =
tileRdd.mapValues(_.renderPng(colorMap).bytes)

SaveToHadoop(imageRdd, { (k: SpatialKey) => s"${outputPath}/${zoom}/${k.col}/${k.row}.png" })
val keyToPath = { k: SpatialKey => s"${outputPath}/${zoom}/${k.col}/${k.row}.png" }
if (outputPath.startsWith("s3")) {
SaveToS3(imageRdd, keyToPath, { request =>
request.toBuilder.acl(ObjectCannedACL.PUBLIC_READ).build()
})
} else {
SaveToHadoop(imageRdd, keyToPath)
}
}
}

Expand All @@ -57,4 +73,63 @@ object OutputPyramid {
else writer.write(id, pyramid.levels(z), ZCurveKeyIndexMethod)
}
}

/**
* Generates a vector tile layer of aggregated hex bins representing the
* underlying population in the WorldPop 100m raster.
*
* Specifically coded to only work with PopulationNearRoadsJob.forgottenLayer ATM.
*
* Utilizes the Uber H3 hex library to index and generate hex geometries.
*
* @param layer
* @param outputUri S3 or local file URI to write the layer to
* @param spark
*/
def forgottenLayerHexVectorTiles(layer: TileLayerRDD[SpatialKey], outputUri: URI)(implicit spark: SparkSession): Unit = {
import spark.implicits._
val maxZoom = 10
val minZoom = 6

val layout = layer.metadata.layout
val gridPointsRdd: RDD[(String, Double)] = layer.flatMap {
case (key: SpatialKey, tile: Tile) => {
val h3: H3Core = H3Core.newInstance
val tileExtent = key.extent(layout)
val re = RasterExtent(tileExtent, tile)
for {
col <- Iterator.range(0, tile.cols)
row <- Iterator.range(0, tile.rows)
v = tile.getDouble(col, row)
if isData(v)
} yield {
val (lon, lat) = re.gridToMap(col, row)
// Higher number makes larger hexagons
// Zero means that our starting maxZoom == h3 hex "resolution"
val hexZoomOffset = 2
val h3Index = h3.geoToH3Address(lat, lon, maxZoom - hexZoomOffset)
(h3Index, v)
}
}
}

val pipeline = ForgottenPopPipeline(
"geom",
outputUri,
maxZoom
)
val vpOptions = VectorPipe.Options(
maxZoom = maxZoom,
minZoom = Some(minZoom),
srcCRS = WebMercator,
destCRS = None,
useCaching = false,
orderAreas = false
)

val gridPointsDf = gridPointsRdd
.toDF("h3Index", "pop")
.withColumn("geom", pipeline.geomUdf(col("h3Index")))
VectorPipe(gridPointsDf, pipeline, vpOptions)
}
}
51 changes: 30 additions & 21 deletions src/main/scala/geotrellis/sdg/PopulationNearRoads.scala
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
package geotrellis.sdg

import java.io.PrintWriter
import java.net.URI

import cats.implicits._
import com.monovore.decline._
import geotrellis.layer._
import geotrellis.proj4.LatLng
import geotrellis.qatiles.RoadTags
import geotrellis.vector._
import geotrellis.vector.io.json.JsonFeatureCollection
import cats.implicits._
import com.monovore.decline._
import org.locationtech.geomesa.spark.jts._
import _root_.io.circe.syntax._
import org.apache.spark.storage.StorageLevel
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark._
import org.apache.spark.sql._
import java.io.PrintWriter
import java.net.URI

import geotrellis.qatiles.RoadTags
import org.locationtech.geomesa.spark.jts._

import scala.concurrent.{Await, Future}


/**
* Summary of population within 2km of roads
*/
Expand Down Expand Up @@ -80,7 +79,7 @@ object PopulationNearRoads extends CommandApp(

import scala.concurrent.ExecutionContext.Implicits.global

val result: Map[Country, PopulationSummary] = {
val result: List[(Country, PopulationSummary, Array[Double])] = {
val future = Future.traverse(countries)( country => Future {
spark.sparkContext.setJobGroup(country.code, country.name)

Expand All @@ -89,42 +88,52 @@ object PopulationNearRoads extends CommandApp(
val layout = LayoutDefinition(rasterSource.gridExtent, 256)

val job = new PopulationNearRoadsJob(country, grumpRdd, layout, LatLng,
{ t => RoadTags.includedValues.contains(t.highway) },
maxZoom = 10,
minZoom = 6)
{ t => RoadTags.includedValues.contains(t.highway.getOrElse("")) })

job.grumpMaskRdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
job.forgottenLayer.persist(StorageLevel.MEMORY_AND_DISK_SER)
val (summary, histogram) = job.result
val (colorMap, breaks) = SDGColorMaps.forgottenPop(histogram)

PopulationNearRoadsJob.layerToGeoTiff(job.forgottenLayer).write(s"/tmp/sdg-${country.code}-all-roads.tif")

outputCatalog.foreach { uri =>
OutputPyramid.saveLayer(job.forgottenLayer, histogram, uri, country.code)
}

println(s"Histogram: ${histogram.minValue}, ${histogram.maxValue}")
println(s"Breaks: ${breaks.mkString(",")}")

tileLayerUriPrefix match {
case Some(tileLayerUri) => {
job.forgottenLayerTiles(URI.create(s"$tileLayerUri/${country.code}/forgotten-pop"))
job.roadLayerTiles(URI.create(s"$tileLayerUri/${country.code}/roads"))
OutputPyramid.savePng(
job.forgottenLayer,
colorMap,
outputPath = s"$tileLayerUri/${country.code}/forgotten-pop"
)
job.roadLayerTiles(
URI.create(s"$tileLayerUri/${country.code}/roads"),
maxZoom = 10,
minZoom = 6
)
}
case _ => println("Skipped generating vector tile layers. Use --tileLayerUriPrefix to save.")
case _ => println("Skipped generating tile layers. Use --tileLayerUriPrefix to save.")
}

job.forgottenLayer.unpersist()
job.grumpMaskRdd.unpersist()

spark.sparkContext.clearJobGroup()
(country, summary)
(country, summary, breaks)
})
Await.result(future, scala.concurrent.duration.Duration.Inf).toMap
Await.result(future, scala.concurrent.duration.Duration.Inf)
}

result.foreach({ case (c, s) => println(c.code + " " + s.report) })
result.foreach({ case (c, s, _) => println(c.code + " " + s.report) })

val collection = JsonFeatureCollection()
result.foreach { case (country, summary) =>
val f = Feature(country.boundary, OutputProperties(country, summary).asJson)
result.foreach { case (country, summary, breaks) =>
val f = Feature(country.boundary, OutputProperties(country, summary, breaks).asJson)
collection.add(f)
}

Expand All @@ -144,4 +153,4 @@ object PopulationNearRoads extends CommandApp(
spark.stop
}
}
})
})
55 changes: 4 additions & 51 deletions src/main/scala/geotrellis/sdg/PopulationNearRoadsJob.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package geotrellis.sdg

import java.net.URI

import geotrellis.layer._
import geotrellis.proj4._
import geotrellis.qatiles.{OsmQaTiles, RoadTags}
Expand All @@ -13,7 +15,6 @@ import geotrellis.vectortile.VectorTile
import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
import org.locationtech.jts.geom.TopologyException
import org.locationtech.jts.operation.union.CascadedPolygonUnion
import org.log4s._
Expand All @@ -22,9 +23,6 @@ import vectorpipe.VectorPipe

import scala.collection.JavaConverters._
import scala.collection.mutable
import java.net.URI

import com.uber.h3core.H3Core

/**
* Here we're going to start from country mbtiles and then do ranged reads
Expand All @@ -34,9 +32,7 @@ class PopulationNearRoadsJob(
grumpRdd: RDD[Geometry],
layout: LayoutDefinition,
crs: CRS,
roadFilter: RoadTags => Boolean,
maxZoom: Int,
minZoom: Int
roadFilter: RoadTags => Boolean
)(implicit spark: SparkSession) extends Serializable {
import PopulationNearRoadsJob._

Expand Down Expand Up @@ -132,7 +128,7 @@ class PopulationNearRoadsJob(
ContextRDD(rdd, md)
}

def roadLayerTiles(outputUri: URI): Unit = {
def roadLayerTiles(outputUri: URI, maxZoom: Int, minZoom: Int): Unit = {
import spark.implicits._

val filteredRoadsWithTagsRdd: RDD[(SpatialKey, MultiLineString, Long, String, String, Boolean)] =
Expand Down Expand Up @@ -163,49 +159,6 @@ class PopulationNearRoadsJob(
VectorPipe(filteredRoadsWithTagsDf, pipeline, vpOptions)
}

def forgottenLayerTiles(outputUri: URI): Unit = {
import spark.implicits._
val gridPointsRdd: RDD[(String, Double)] = forgottenLayer.flatMap {
case (key: SpatialKey, tile: Tile) => {
val h3: H3Core = H3Core.newInstance
val tileExtent = key.extent(layout)
val re = RasterExtent(tileExtent, tile)
for {
col <- Iterator.range(0, tile.cols)
row <- Iterator.range(0, tile.rows)
v = tile.getDouble(col, row)
if isData(v)
} yield {
val (lon, lat) = re.gridToMap(col, row)
// Higher number makes larger hexagons
// Zero means that our starting maxZoom == h3 hex "resolution"
val hexZoomOffset = 0
val h3Index = h3.geoToH3Address(lat, lon, maxZoom - hexZoomOffset)
(h3Index, v)
}
}
}

val pipeline = ForgottenPopPipeline(
"geom",
outputUri,
maxZoom
)
val vpOptions = VectorPipe.Options(
maxZoom = maxZoom,
minZoom = Some(minZoom),
srcCRS = WebMercator,
destCRS = None,
useCaching = false,
orderAreas = false
)

val gridPointsDf = gridPointsRdd
.toDF("h3Index", "pop")
.withColumn("geom", pipeline.geomUdf(col("h3Index")))
VectorPipe(gridPointsDf, pipeline, vpOptions)
}

lazy val result: (PopulationSummary, StreamingHistogram) =
popRegions.flatMap({ case (_, r) =>
for {
Expand Down
Loading

0 comments on commit 66baeab

Please sign in to comment.