Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GEOMESA-3259 FSDS - Support GeoParquet for Point geometries #3254

Merged
merged 2 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@
<groupId>org.specs2</groupId>
<artifactId>specs2-junit_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>com.github.erosb</groupId>
<artifactId>everit-json-schema</artifactId>
<version>1.14.4</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "GeoParquet",
"description": "Parquet metadata included in the geo field.",
"type": "object",
"required": [
"version",
"primary_column",
"columns"
],
"properties": {
"version": {
"type": "string",
"const": "1.1.0"
},
"primary_column": {
"type": "string",
"minLength": 1
},
"columns": {
"type": "object",
"minProperties": 1,
"patternProperties": {
".+": {
"type": "object",
"required": [
"encoding",
"geometry_types"
],
"properties": {
"encoding": {
"type": "string",
"pattern": "^(WKB|point|linestring|polygon|multipoint|multilinestring|multipolygon)$"
},
"geometry_types": {
"type": "array",
"uniqueItems": true,
"items": {
"type": "string",
"pattern": "^(GeometryCollection|(Multi)?(Point|LineString|Polygon))( Z)?$"
}
},
"crs": {
"oneOf": [
{
"$ref": "https://proj.org/schemas/v0.7/projjson.schema.json"
},
{
"type": "null"
}
]
},
"edges": {
"type": "string",
"enum": [
"planar",
"spherical"
]
},
"orientation": {
"type": "string",
"const": "counterclockwise"
},
"bbox": {
"type": "array",
"items": {
"type": "number"
},
"oneOf": [
{
"description": "2D bbox consisting of (xmin, ymin, xmax, ymax)",
"minItems": 4,
"maxItems": 4
},
{
"description": "3D bbox consisting of (xmin, ymin, zmin, xmax, ymax, zmax)",
"minItems": 6,
"maxItems": 6
}
]
},
"epoch": {
"type": "number"
},
"covering": {
"type": "object",
"required": [
"bbox"
],
"properties": {
"bbox": {
"type": "object",
"required": [
"xmin",
"xmax",
"ymin",
"ymax"
],
"properties": {
"xmin": {
"type": "array",
"items": [
{
"type": "string",
"minLength": 1
},
{
"const": "xmin"
}
],
"minItems": 2,
"maxItems": 2
},
"xmax": {
"type": "array",
"items": [
{
"type": "string",
"minLength": 1
},
{
"const": "xmax"
}
],
"minItems": 2,
"maxItems": 2
},
"ymin": {
"type": "array",
"items": [
{
"type": "string",
"minLength": 1
},
{
"const": "ymin"
}
],
"minItems": 2,
"maxItems": 2
},
"ymax": {
"type": "array",
"items": [
{
"type": "string",
"minLength": 1
},
{
"const": "ymax"
}
],
"minItems": 2,
"maxItems": 2
}
}
}
}
}
}
}
},
"additionalProperties": false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,25 @@

package org.locationtech.geomesa.fs.storage.parquet.io

import com.google.gson.GsonBuilder
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.api.InitContext
import org.apache.parquet.hadoop.metadata.FileMetaData
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.Type.Repetition
import org.apache.parquet.schema.Types.BasePrimitiveBuilder
import org.apache.parquet.schema._
import org.geotools.api.feature.`type`.AttributeDescriptor
import org.geotools.api.feature.simple.SimpleFeatureType
import org.geotools.api.feature.`type`.{AttributeDescriptor, GeometryDescriptor}
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.locationtech.geomesa.features.serialization.TwkbSerialization.GeometryBytes
import org.locationtech.geomesa.fs.storage.common.jobs.StorageConfiguration
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserver
import org.locationtech.geomesa.utils.geotools.ObjectType.ObjectType
import org.locationtech.geomesa.utils.geotools.{ObjectType, SimpleFeatureTypes}
import org.locationtech.geomesa.utils.text.StringSerialization
import org.locationtech.jts.geom.{Envelope, Geometry, GeometryCollection}

import java.util.{Collections, Locale}

/**
* A paired simple feature type and parquet schema
Expand Down Expand Up @@ -256,4 +261,97 @@ object SimpleFeatureParquetSchema {
def apply(binding: ObjectType): Binding =
bindings.getOrElse(binding, throw new NotImplementedError(s"No mapping defined for type $binding"))
}

object GeoParquetMetadata {

val GeoParquetMetadataKey = "geo"

private val gson = new GsonBuilder().disableHtmlEscaping().create()

// TODO "covering" - per row bboxes can be used to accelerate spatial queries at the row group/page level

/**
* Indicates if an attribute is supported or not.
*
* Currently only mixed geometry (encoded as WKB) and point types are encoded correctly as GeoParquet.
*
* TODO GEOMESA-3259 support non-point geometries
*
* @param d descriptor
* @return
*/
private[parquet] def supported(d: GeometryDescriptor): Boolean = {
ObjectType.selectType(d).last match {
case ObjectType.POINT | ObjectType.GEOMETRY => true
case _ => false
}
}

/**
* Generate json describing the GeoParquet file, conforming to the 1.1.0 GeoParquet schema
*
* @param primaryGeometry name of the primary geometry in the file
* @param geometries pairs of geometries and optional bounds
* @return
*/
def apply(primaryGeometry: String, geometries: Seq[(GeometryDescriptor, Option[Envelope])]): String = {
val cols = geometries.map { case (descriptor, bounds) =>
val (encoding, types) = descriptor.getType.getBinding match {
case b if b == classOf[Geometry] => ("WKB", Collections.emptyList())
case b if b == classOf[GeometryCollection] => ("WKB", Collections.singletonList(classOf[GeometryCollection].getSimpleName))
case b => (b.getSimpleName.toLowerCase(Locale.US), Collections.singletonList(b.getSimpleName))
}
val metadata = new java.util.HashMap[String, AnyRef]
metadata.put("encoding", encoding)
metadata.put("geometry_types", types) // TODO add ' Z' for 3d points
bounds.filterNot(_.isNull).foreach { e =>
metadata.put("bbox", java.util.List.of(e.getMinX, e.getMinY, e.getMaxX, e.getMaxY)) // TODO add z for 3d points
}
StringSerialization.alphaNumericSafeString(descriptor.getLocalName) -> metadata
}

val model = java.util.Map.of(
"version", "1.1.0",
"primary_column", primaryGeometry,
"columns", cols.toMap.asJava
)

gson.toJson(model)
}

/**
* Observer class that generates GeoParquet metadata
*
* @param sft simple feature type
*/
class GeoParquetObserver(sft: SimpleFeatureType) extends FileSystemObserver {

import scala.collection.JavaConverters._

private val bounds =
sft.getAttributeDescriptors.asScala.zipWithIndex.collect {
case (d: GeometryDescriptor, i) if supported(d) => (d, i, new Envelope())
}

def metadata(): java.util.Map[String, String] = {
if (bounds.isEmpty) { Collections.emptyMap() } else {
val geoms = bounds.map { case (d, _, env) => (d, Some(env).filterNot(_.isNull)) }
val primary = bounds.find(_._1 == sft.getGeometryDescriptor).getOrElse(bounds.head)._1.getLocalName
Collections.singletonMap(GeoParquetMetadataKey, GeoParquetMetadata(primary, geoms.toSeq))
}
}

override def write(feature: SimpleFeature): Unit = {
bounds.foreach { case (_, i, envelope) =>
val geom = feature.getAttribute(i).asInstanceOf[Geometry]
if (geom != null) {
envelope.expandToInclude(geom.getEnvelopeInternal)
}
}
}

override def flush(): Unit = {}
override def close(): Unit = {}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ package org.locationtech.geomesa.fs.storage.parquet.io

import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.api.WriteSupport
import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
import org.apache.parquet.hadoop.api.WriteSupport.{FinalizedWriteContext, WriteContext}
import org.apache.parquet.io.api.{Binary, RecordConsumer}
import org.geotools.api.feature.`type`.AttributeDescriptor
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.locationtech.geomesa.fs.storage.parquet.io.SimpleFeatureParquetSchema.GeoParquetMetadata.GeoParquetObserver
import org.locationtech.geomesa.utils.geotools.ObjectType
import org.locationtech.geomesa.utils.geotools.ObjectType.ObjectType
import org.locationtech.geomesa.utils.io.CloseWithLogging
import org.locationtech.geomesa.utils.text.WKBUtils
import org.locationtech.jts.geom._

Expand All @@ -26,6 +28,8 @@ class SimpleFeatureWriteSupport extends WriteSupport[SimpleFeature] {

private var writer: SimpleFeatureWriteSupport.SimpleFeatureWriter = _
private var consumer: RecordConsumer = _
private var geoParquetObserver: GeoParquetObserver = _
private var baseMetadata: java.util.Map[String, String] = _

override val getName: String = "SimpleFeatureWriteSupport"

Expand All @@ -35,6 +39,8 @@ class SimpleFeatureWriteSupport extends WriteSupport[SimpleFeature] {
throw new IllegalArgumentException("Could not extract SimpleFeatureType from write context")
}
this.writer = SimpleFeatureWriteSupport.SimpleFeatureWriter(schema.sft)
this.geoParquetObserver = new GeoParquetObserver(schema.sft)
this.baseMetadata = schema.metadata

new WriteContext(schema.schema, schema.metadata)
}
Expand All @@ -43,7 +49,22 @@ class SimpleFeatureWriteSupport extends WriteSupport[SimpleFeature] {
override def prepareForWrite(recordConsumer: RecordConsumer): Unit = consumer = recordConsumer

// called per row
override def write(record: SimpleFeature): Unit = writer.write(consumer, record)
override def write(record: SimpleFeature): Unit = {
writer.write(consumer, record)
geoParquetObserver.write(record)
}

// called once at the end
override def finalizeWrite(): FinalizedWriteContext = {
try {
val metadata = new java.util.HashMap[String, String]()
metadata.putAll(baseMetadata)
metadata.putAll(geoParquetObserver.metadata())
new FinalizedWriteContext(metadata)
} finally {
CloseWithLogging(geoParquetObserver)
}
}
}

object SimpleFeatureWriteSupport {
Expand Down
Loading
Loading