From 37d4eadaa3e3a025cb313816cbc61a26fd9b7342 Mon Sep 17 00:00:00 2001 From: Elie Pyatkevich Date: Mon, 28 Oct 2024 11:38:05 -0400 Subject: [PATCH] Arrow: introduce an axis order parameter Geomesa currently defaults to always reading and writing Arrow encoding as NORTH_EAST, with the new axis order parameter allow the user to request the encoding to be explicitly specified per request. Axis Order parameter supported values: - NORTH_EAST translates to: North = Latitude = Y Axis, East = Longitude = X Axis - EAST_NORTH translates to: East = Longitude = X Axis, North = Latitude = Y Axis The request parameter is optional and if not specified axis order defaults to the existing behavior which is NORTH_EAST. --- .../ArrowConversionProcessTest.scala | 12 ++--- .../arrow/vector/ArrowAttributeWriter.scala | 21 ++++++--- .../arrow/vector/SimpleFeatureVector.scala | 30 +++++++++--- geomesa-arrow/geomesa-arrow-jts/pom.xml | 4 ++ .../geomesa/arrow/jts/GeometryVector.java | 5 ++ .../geomesa/arrow/jts/WKBGeometryVector.java | 15 +++++- .../jts/impl/AbstractGeometryVector.java | 20 ++++++++ .../arrow/jts/impl/AbstractPointVector.java | 46 +++++++++++++++---- .../features/exporters/ArrowExporter.scala | 2 +- .../geomesa/index/conf/QueryHints.scala | 5 ++ .../geomesa/index/iterators/ArrowScan.scala | 15 ++++-- .../index/planning/LocalQueryRunner.scala | 2 +- .../index/view/MergedQueryRunner.scala | 2 +- .../transform/ArrowConversionProcess.scala | 7 ++- .../ArrowConversionProcessTest.scala | 20 ++++---- 15 files changed, 159 insertions(+), 47 deletions(-) diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/process/transform/ArrowConversionProcessTest.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/process/transform/ArrowConversionProcessTest.scala index fe91841e5b4b..be8dc85690b3 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/process/transform/ArrowConversionProcessTest.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/process/transform/ArrowConversionProcessTest.scala @@ -44,7 +44,7 @@ class ArrowConversionProcessTest extends TestWithFeatureType { "ArrowConversionProcess" should { "encode an empty feature collection" in { - val bytes = process.execute(new ListFeatureCollection(sft), null, null, null, null, null, null, null, null).asScala.reduce(_ ++ _) + val bytes = process.execute(new ListFeatureCollection(sft), null, null, null, null, null, null, null, null, null).asScala.reduce(_ ++ _) WithClose(SimpleFeatureArrowFileReader.streaming(() => new ByteArrayInputStream(bytes))) { reader => reader.sft mustEqual sft SelfClosingIterator(reader.features()) must beEmpty @@ -52,7 +52,7 @@ class ArrowConversionProcessTest extends TestWithFeatureType { } "encode an empty accumulo feature collection" in { - val bytes = process.execute(fs.getFeatures(ECQL.toFilter("bbox(geom,20,20,30,30)")), null, null, null, null, null, null, null, null).asScala.reduce(_ ++ _) + val bytes = process.execute(fs.getFeatures(ECQL.toFilter("bbox(geom,20,20,30,30)")), null, null, null, null, null, null, null, null, null).asScala.reduce(_ ++ _) WithClose(SimpleFeatureArrowFileReader.streaming(() => new ByteArrayInputStream(bytes))) { reader => reader.sft mustEqual sft SelfClosingIterator(reader.features()) must beEmpty @@ -60,7 +60,7 @@ class ArrowConversionProcessTest extends TestWithFeatureType { } "encode an accumulo feature collection in distributed fashion" in { - val bytes = process.execute(fs.getFeatures(Filter.INCLUDE), null, null, null, null, null, null, null, null).asScala.reduce(_ ++ _) + val bytes = process.execute(fs.getFeatures(Filter.INCLUDE), null, null, null, null, null, null, null, null, null).asScala.reduce(_ ++ _) WithClose(SimpleFeatureArrowFileReader.streaming(() => new ByteArrayInputStream(bytes))) { reader => reader.sft mustEqual sft SelfClosingIterator(reader.features()).map(ScalaSimpleFeature.copy).toSeq must @@ -70,7 +70,7 @@ class ArrowConversionProcessTest extends TestWithFeatureType { "encode an accumulo feature collection in distributed fashion with calculated dictionary values" in { val filter = ECQL.toFilter("name = 'name0'") - val bytes = process.execute(fs.getFeatures(filter), null, null, null, Collections.singletonList("name"), null, null, null, null).asScala.reduce(_ ++ _) + val bytes = process.execute(fs.getFeatures(filter), null, null, null, Collections.singletonList("name"), null, null, null, null, null).asScala.reduce(_ ++ _) WithClose(SimpleFeatureArrowFileReader.streaming(() => new ByteArrayInputStream(bytes))) { reader => reader.sft mustEqual sft SelfClosingIterator(reader.features()).map(ScalaSimpleFeature.copy).toSeq must @@ -81,7 +81,7 @@ class ArrowConversionProcessTest extends TestWithFeatureType { } "sort and encode an accumulo feature collection in distributed fashion" in { - val bytes = process.execute(fs.getFeatures(Filter.INCLUDE), null, null, null, null, "dtg", null, null, null).asScala.reduce(_ ++ _) + val bytes = process.execute(fs.getFeatures(Filter.INCLUDE), null, null, null, null, "dtg", null, null, null, null).asScala.reduce(_ ++ _) WithClose(SimpleFeatureArrowFileReader.streaming(() => new ByteArrayInputStream(bytes))) { reader => reader.sft mustEqual sft SelfClosingIterator(reader.features()).map(ScalaSimpleFeature.copy).toList mustEqual features @@ -89,7 +89,7 @@ class ArrowConversionProcessTest extends TestWithFeatureType { } "reverse sort and encode an accumulo feature collection in distributed fashion" in { - val bytes = process.execute(fs.getFeatures(Filter.INCLUDE), null, null, null, null, "dtg", Boolean.box(true), null, null).asScala.reduce(_ ++ _) + val bytes = process.execute(fs.getFeatures(Filter.INCLUDE), null, null, null, null, "dtg", Boolean.box(true), null, null, null).asScala.reduce(_ ++ _) WithClose(SimpleFeatureArrowFileReader.streaming(() => new ByteArrayInputStream(bytes))) { reader => reader.sft mustEqual sft SelfClosingIterator(reader.features()).map(ScalaSimpleFeature.copy).toList mustEqual features.reverse diff --git a/geomesa-arrow/geomesa-arrow-gt/src/main/scala/org/locationtech/geomesa/arrow/vector/ArrowAttributeWriter.scala b/geomesa-arrow/geomesa-arrow-gt/src/main/scala/org/locationtech/geomesa/arrow/vector/ArrowAttributeWriter.scala index ace45e7a5ca5..7ab2f5853ea4 100644 --- a/geomesa-arrow/geomesa-arrow-gt/src/main/scala/org/locationtech/geomesa/arrow/vector/ArrowAttributeWriter.scala +++ b/geomesa-arrow/geomesa-arrow-gt/src/main/scala/org/locationtech/geomesa/arrow/vector/ArrowAttributeWriter.scala @@ -16,6 +16,7 @@ import org.apache.arrow.vector.types.Types.MinorType import org.apache.arrow.vector.types.pojo.{ArrowType, FieldType} import org.geotools.api.feature.`type`.AttributeDescriptor import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType} +import org.geotools.referencing.CRS.AxisOrder import org.locationtech.geomesa.arrow.jts._ import org.locationtech.geomesa.arrow.vector.SimpleFeatureVector.SimpleFeatureEncoding import org.locationtech.geomesa.arrow.vector.SimpleFeatureVector.SimpleFeatureEncoding.Encoding @@ -192,7 +193,7 @@ object ArrowAttributeWriter { case ObjectType.LONG => new ArrowLongWriter(name, metadata, factory) case ObjectType.FLOAT => new ArrowFloatWriter(name, metadata, factory) case ObjectType.DOUBLE => new ArrowDoubleWriter(name, metadata, factory) - case ObjectType.GEOMETRY => geometry(name, bindings(1), encoding.geometry, metadata, factory) + case ObjectType.GEOMETRY => geometry(name, bindings(1), encoding, metadata, factory) case ObjectType.BOOLEAN => new ArrowBooleanWriter(name, metadata, factory) case ObjectType.LIST => new ArrowListWriter(name, bindings(1), encoding, metadata, factory) case ObjectType.MAP => new ArrowMapWriter(name, bindings(1), bindings(2), encoding, metadata, factory) @@ -234,11 +235,11 @@ object ArrowAttributeWriter { private def geometry( name: String, binding: ObjectType, - encoding: Encoding, + encoding: SimpleFeatureEncoding, metadata: Map[String, String], factory: VectorFactory): ArrowGeometryWriter = { val m = metadata.asJava - val vector = (binding, encoding, factory) match { + val vector = (binding, encoding.geometry, factory) match { case (ObjectType.POINT, Encoding.Min, FromStruct(c)) => new PointFloatVector(name, c, m) case (ObjectType.POINT, Encoding.Min, FromAllocator(c)) => new PointFloatVector(name, c, m) case (ObjectType.POINT, Encoding.Max, FromStruct(c)) => new PointVector(name, c, m) @@ -269,7 +270,7 @@ object ArrowAttributeWriter { case (_, _, FromList(_)) => throw new NotImplementedError("Geometry lists are not supported") case _ => throw new IllegalArgumentException(s"Unexpected geometry type $binding") } - new ArrowGeometryWriter(name, vector.asInstanceOf[GeometryVector[Geometry, FieldVector]]) + new ArrowGeometryWriter(name, vector.asInstanceOf[GeometryVector[Geometry, FieldVector]], encoding) } trait ArrowDictionaryWriter extends ArrowAttributeWriter { @@ -464,13 +465,21 @@ object ArrowAttributeWriter { /** * Writes geometries - delegates to our JTS geometry vectors */ - class ArrowGeometryWriter(val name: String, delegate: GeometryVector[Geometry, FieldVector]) + class ArrowGeometryWriter(val name: String, delegate: GeometryVector[Geometry, FieldVector], encoding: SimpleFeatureEncoding) extends ArrowAttributeWriter { override def vector: FieldVector = delegate.getVector // note: delegate handles nulls - override def apply(i: Int, value: AnyRef): Unit = delegate.set(i, value.asInstanceOf[Geometry]) + override def apply(i: Int, value: AnyRef): Unit = { + val options = Map("axisOrder" -> (encoding.axisOrder match { + case Encoding.Max => AxisOrder.EAST_NORTH /* Lon/Lat */ + case _ => AxisOrder.NORTH_EAST /* Lat/Lon */ + })).asInstanceOf[Map[String, AnyRef]].asJava + + delegate.setOptions(options) + delegate.set(i, value.asInstanceOf[Geometry]) + } override def setValueCount(count: Int): Unit = delegate.setValueCount(count) } diff --git a/geomesa-arrow/geomesa-arrow-gt/src/main/scala/org/locationtech/geomesa/arrow/vector/SimpleFeatureVector.scala b/geomesa-arrow/geomesa-arrow-gt/src/main/scala/org/locationtech/geomesa/arrow/vector/SimpleFeatureVector.scala index 688fe61fba02..f3c962f3cf83 100644 --- a/geomesa-arrow/geomesa-arrow-gt/src/main/scala/org/locationtech/geomesa/arrow/vector/SimpleFeatureVector.scala +++ b/geomesa-arrow/geomesa-arrow-gt/src/main/scala/org/locationtech/geomesa/arrow/vector/SimpleFeatureVector.scala @@ -14,6 +14,9 @@ import org.apache.arrow.vector.types.FloatingPointPrecision import org.apache.arrow.vector.types.pojo.{ArrowType, FieldType} import org.apache.arrow.vector.{BigIntVector, FieldVector} import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType} +import org.geotools.referencing.CRS +import org.geotools.referencing.CRS.AxisOrder +import org.geotools.util.factory.Hints import org.locationtech.geomesa.arrow.ArrowAllocator import org.locationtech.geomesa.arrow.features.ArrowSimpleFeature import org.locationtech.geomesa.arrow.jts.GeometryFields @@ -132,16 +135,20 @@ object SimpleFeatureVector { val DescriptorKey = "descriptor" val OptionsKey = "options" - case class SimpleFeatureEncoding(fids: Option[Encoding], geometry: Encoding, date: Encoding) + case class SimpleFeatureEncoding(fids: Option[Encoding], geometry: Encoding, date: Encoding, axisOrder: Encoding) object SimpleFeatureEncoding { - val Min: SimpleFeatureEncoding = SimpleFeatureEncoding(Some(Encoding.Min), Encoding.Min, Encoding.Min) - val Max: SimpleFeatureEncoding = SimpleFeatureEncoding(Some(Encoding.Max), Encoding.Max, Encoding.Max) + val Min: SimpleFeatureEncoding = SimpleFeatureEncoding(Some(Encoding.Min), Encoding.Min, Encoding.Min, Encoding.Min) + val Max: SimpleFeatureEncoding = SimpleFeatureEncoding(Some(Encoding.Max), Encoding.Max, Encoding.Max, Encoding.Max) - def min(includeFids: Boolean, proxyFids: Boolean = false): SimpleFeatureEncoding = { + def min(includeFids: Boolean, proxyFids: Boolean = false, axisOrder: Option[AxisOrder] = None): SimpleFeatureEncoding = { val fids = if (includeFids) { Some(if (proxyFids) { Encoding.Min } else { Encoding.Max }) } else { None } - SimpleFeatureEncoding(fids, Encoding.Min, Encoding.Min) + val axisOrderEncoded = axisOrder match { + case Some(AxisOrder.EAST_NORTH) => Encoding.Max /* Lon/Lat */ + case _ => Encoding.Min /* Lat/Lon */ + } + SimpleFeatureEncoding(fids, Encoding.Min, Encoding.Min, axisOrderEncoded) } object Encoding extends Enumeration { @@ -245,7 +252,18 @@ object SimpleFeatureVector { val isLong = dateVector.exists(_.isInstanceOf[BigIntVector]) if (isLong) { Encoding.Max } else { Encoding.Min } } - val encoding = SimpleFeatureEncoding(fidEncoding, geomPrecision, datePrecision) + val axisOrderPrecision = { + if (java.lang.Boolean.getBoolean("org.geotools.referencing.forceXY") || + Hints.getSystemDefault(Hints.FORCE_LONGITUDE_FIRST_AXIS_ORDER) == java.lang.Boolean.TRUE) { + Encoding.Max /* Lon/Lat */ + } else { + CRS.getAxisOrder(sft.getCoordinateReferenceSystem) match { + case AxisOrder.EAST_NORTH => Encoding.Max /* Lon/Lat */ + case _ => Encoding.Min /* Lat/Lon */ + } + } + } + val encoding = SimpleFeatureEncoding(fidEncoding, geomPrecision, datePrecision, axisOrderPrecision) (sft, encoding) } diff --git a/geomesa-arrow/geomesa-arrow-jts/pom.xml b/geomesa-arrow/geomesa-arrow-jts/pom.xml index 3a4b7f6d2622..991956b6aa19 100644 --- a/geomesa-arrow/geomesa-arrow-jts/pom.xml +++ b/geomesa-arrow/geomesa-arrow-jts/pom.xml @@ -24,6 +24,10 @@ org.locationtech.jts jts-core + + org.geotools + gt-referencing + diff --git a/geomesa-arrow/geomesa-arrow-jts/src/main/java/org/locationtech/geomesa/arrow/jts/GeometryVector.java b/geomesa-arrow/geomesa-arrow-jts/src/main/java/org/locationtech/geomesa/arrow/jts/GeometryVector.java index 42bdd86dc47e..8379fc67ae08 100644 --- a/geomesa-arrow/geomesa-arrow-jts/src/main/java/org/locationtech/geomesa/arrow/jts/GeometryVector.java +++ b/geomesa-arrow/geomesa-arrow-jts/src/main/java/org/locationtech/geomesa/arrow/jts/GeometryVector.java @@ -12,6 +12,8 @@ import org.locationtech.jts.geom.Geometry; import org.locationtech.jts.geom.GeometryFactory; +import java.util.Map; + /** * Complex vector for geometries * @@ -32,4 +34,7 @@ public interface GeometryVector exten int getNullCount(); void transfer(int fromIndex, int toIndex, GeometryVector to); + + Map getOptions(); + void setOptions(Map map); } diff --git a/geomesa-arrow/geomesa-arrow-jts/src/main/java/org/locationtech/geomesa/arrow/jts/WKBGeometryVector.java b/geomesa-arrow/geomesa-arrow-jts/src/main/java/org/locationtech/geomesa/arrow/jts/WKBGeometryVector.java index 33a6cb731553..0eb24d0fb959 100644 --- a/geomesa-arrow/geomesa-arrow-jts/src/main/java/org/locationtech/geomesa/arrow/jts/WKBGeometryVector.java +++ b/geomesa-arrow/geomesa-arrow-jts/src/main/java/org/locationtech/geomesa/arrow/jts/WKBGeometryVector.java @@ -20,15 +20,17 @@ import org.locationtech.jts.io.WKBWriter; import javax.annotation.Nullable; +import java.util.HashMap; import java.util.Map; /** * Catch-all for storing instances of Geometry as WKB */ public class WKBGeometryVector implements GeometryVector { - private VarBinaryVector vector; + private final VarBinaryVector vector; private WKBWriter writer = null; private WKBReader reader = null; + protected final Map options = new HashMap<>(); public static final Field field = Field.nullablePrimitive("wkb", ArrowType.Binary.INSTANCE); @@ -108,4 +110,15 @@ public void transfer(int fromIndex, int toIndex, GeometryVector getOptions() { + return options; + } + + @Override + public void setOptions(Map map) { + options.clear(); + options.putAll(map); + } } diff --git a/geomesa-arrow/geomesa-arrow-jts/src/main/java/org/locationtech/geomesa/arrow/jts/impl/AbstractGeometryVector.java b/geomesa-arrow/geomesa-arrow-jts/src/main/java/org/locationtech/geomesa/arrow/jts/impl/AbstractGeometryVector.java index f952561438e1..6e551190f267 100644 --- a/geomesa-arrow/geomesa-arrow-jts/src/main/java/org/locationtech/geomesa/arrow/jts/impl/AbstractGeometryVector.java +++ b/geomesa-arrow/geomesa-arrow-jts/src/main/java/org/locationtech/geomesa/arrow/jts/impl/AbstractGeometryVector.java @@ -10,19 +10,39 @@ import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.types.pojo.Field; +import org.geotools.referencing.CRS; import org.locationtech.geomesa.arrow.jts.GeometryVector; import org.locationtech.jts.geom.Geometry; +import java.util.HashMap; import java.util.List; +import java.util.Map; public abstract class AbstractGeometryVector implements GeometryVector { private V ordinal; protected U vector; + protected final Map options; protected AbstractGeometryVector(U vector) { this.vector = vector; + this.options = new HashMap<>(); + } + + @Override + public Map getOptions() { + return options; + } + + @Override + public void setOptions(Map map) { + options.clear(); + options.putAll(map); + } + + protected boolean swapAxisOrder() { + return getOptions().get("axisOrder") == CRS.AxisOrder.EAST_NORTH; } @Override diff --git a/geomesa-arrow/geomesa-arrow-jts/src/main/java/org/locationtech/geomesa/arrow/jts/impl/AbstractPointVector.java b/geomesa-arrow/geomesa-arrow-jts/src/main/java/org/locationtech/geomesa/arrow/jts/impl/AbstractPointVector.java index 761b7c845ea0..5a505fa98e50 100644 --- a/geomesa-arrow/geomesa-arrow-jts/src/main/java/org/locationtech/geomesa/arrow/jts/impl/AbstractPointVector.java +++ b/geomesa-arrow/geomesa-arrow-jts/src/main/java/org/locationtech/geomesa/arrow/jts/impl/AbstractPointVector.java @@ -53,8 +53,13 @@ public void set(int index, Point geom) { vector.setNull(index); } else { vector.setNotNull(index); - writeOrdinal(index * 2, geom.getY()); - writeOrdinal(index * 2 + 1, geom.getX()); + if (swapAxisOrder()) { + writeOrdinal(y(index), geom.getX()); + writeOrdinal(x(index), geom.getY()); + } else { + writeOrdinal(y(index), geom.getY()); + writeOrdinal(x(index), geom.getX()); + } } } @@ -63,8 +68,14 @@ public Point get(int index) { if (vector.isNull(index)) { return null; } else { - final double y = readOrdinal(index * 2); - final double x = readOrdinal(index * 2 + 1); + final double y, x; + if (swapAxisOrder()) { + y = x(index); + x = y(index); + } else { + y = y(index); + x = x(index); + } return factory.createPoint(new Coordinate(x, y)); } } @@ -76,8 +87,13 @@ public void transfer(int fromIndex, int toIndex, GeometryVector s"$f ${if (r) "DESC" else "ASC" }"}.mkString(", ") @@ -178,5 +181,7 @@ object QueryHints { } } } + + def getAxisOrder: Option[AxisOrder] = Option(hints.get(AXIS_ORDER).asInstanceOf[AxisOrder]) } } diff --git a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/iterators/ArrowScan.scala b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/iterators/ArrowScan.scala index 68783c578184..5b8eb8179b1e 100644 --- a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/iterators/ArrowScan.scala +++ b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/iterators/ArrowScan.scala @@ -12,6 +12,7 @@ import com.typesafe.scalalogging.LazyLogging import org.apache.arrow.vector.ipc.message.IpcOption import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType} import org.geotools.api.filter.Filter +import org.geotools.referencing.CRS.AxisOrder import org.geotools.util.factory.Hints import org.locationtech.geomesa.arrow.io._ import org.locationtech.geomesa.arrow.vector.SimpleFeatureVector.SimpleFeatureEncoding @@ -43,9 +44,10 @@ trait ArrowScan extends AggregatingScan[ArrowScan.ArrowAggregate] { val arrowSft = transform.getOrElse(sft) val includeFids = options(IncludeFidsKey).toBoolean val proxyFids = options.get(ProxyFidsKey).exists(_.toBoolean) + val axisOrder = options.get(AxisOrderKey).map(AxisOrder.valueOf) + val encoding = SimpleFeatureEncoding.min(includeFids, proxyFids, axisOrder) val dictionary = options(DictionaryKey) val sort = options.get(SortKey).map(name => (name, options.get(SortReverseKey).exists(_.toBoolean))) - val encoding = SimpleFeatureEncoding.min(includeFids, proxyFids) val ipcOpts = FormatVersion.options(options(IpcVersionKey)) val dictionaries = dictionary.split(",").filterNot(_.isEmpty) new DeltaAggregate(arrowSft, dictionaries, encoding, ipcOpts, sort, batchSize) @@ -62,6 +64,7 @@ object ArrowScan extends LazyLogging { val IncludeFidsKey = "fids" val ProxyFidsKey = "proxy" + val AxisOrderKey = "axis-order" val DictionaryKey = "dict" val IpcVersionKey = "ipc" val SortKey = "sort" @@ -91,9 +94,10 @@ object ArrowScan extends LazyLogging { val arrowSft = hints.getTransformSchema.getOrElse(sft) val includeFids = hints.isArrowIncludeFid val proxyFids = hints.isArrowProxyFid + val axisOrder = hints.getAxisOrder.getOrElse(AxisOrder.NORTH_EAST) + val encoding = SimpleFeatureEncoding.min(includeFids, proxyFids, Some(axisOrder)) val sort = hints.getArrowSort val batchSize = getBatchSize(hints) - val encoding = SimpleFeatureEncoding.min(includeFids, proxyFids) val ipc = hints.getArrowFormatVersion.getOrElse(FormatVersion.ArrowFormatVersion.get) val ipcOpts = FormatVersion.options(ipc) val dictionaryFields = hints.getArrowDictionaryFields @@ -103,6 +107,7 @@ object ArrowScan extends LazyLogging { base ++ AggregatingScan.optionalMap( IncludeFidsKey -> includeFids.toString, ProxyFidsKey -> proxyFids.toString, + AxisOrderKey -> axisOrder.toString, IpcVersionKey -> ipc, SortKey -> sort.map(_._1), SortReverseKey -> sort.map(_._2.toString), @@ -272,12 +277,12 @@ object ArrowScan extends LazyLogging { SimpleFeatureTypes.createType(options(SftKey), options(SpecKey)) def encoding(e: SimpleFeatureEncoding): (String, String) = - EncodingKey -> s"${e.fids.getOrElse("")}:${e.geometry}:${e.date}" + EncodingKey -> s"${e.fids.getOrElse("")}:${e.geometry}:${e.date}:${e.axisOrder}" def encoding(options: Map[String, String]): SimpleFeatureEncoding = { - val Array(fids, geom, dtg) = options(EncodingKey).split(":") + val Array(fids, geom, dtg, axisOrder) = options(EncodingKey).split(":") val fidOpt = Option(fids).filterNot(_.isEmpty).map(Encoding.withName) - SimpleFeatureEncoding(fidOpt, Encoding.withName(geom), Encoding.withName(dtg)) + SimpleFeatureEncoding(fidOpt, Encoding.withName(geom), Encoding.withName(dtg), Encoding.withName(axisOrder)) } def ipcOption(options: Map[String, String]): IpcOption = FormatVersion.options(options(IpcKey)) diff --git a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/planning/LocalQueryRunner.scala b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/planning/LocalQueryRunner.scala index 5aee8795ee4c..bc6abdf944b9 100644 --- a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/planning/LocalQueryRunner.scala +++ b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/planning/LocalQueryRunner.scala @@ -264,7 +264,7 @@ object LocalQueryRunner extends LazyLogging { val sort = hints.getArrowSort.map(Seq.fill(1)(_)) val batchSize = ArrowScan.getBatchSize(hints) - val encoding = SimpleFeatureEncoding.min(hints.isArrowIncludeFid, hints.isArrowProxyFid) + val encoding = SimpleFeatureEncoding.min(hints.isArrowIncludeFid, hints.isArrowProxyFid, hints.getAxisOrder) val ipcOpts = FormatVersion.options(hints.getArrowFormatVersion.getOrElse(FormatVersion.ArrowFormatVersion.get)) val (features, arrowSft) = transform match { diff --git a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/view/MergedQueryRunner.scala b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/view/MergedQueryRunner.scala index 6369644ddf7f..f0ff9bd9bcac 100644 --- a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/view/MergedQueryRunner.scala +++ b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/view/MergedQueryRunner.scala @@ -142,7 +142,7 @@ class MergedQueryRunner( val arrowSft = QueryPlanner.extractQueryTransforms(sft, query).map(_._1).getOrElse(sft) val sort = hints.getArrowSort val batchSize = ArrowScan.getBatchSize(hints) - val encoding = SimpleFeatureEncoding.min(hints.isArrowIncludeFid, hints.isArrowProxyFid) + val encoding = SimpleFeatureEncoding.min(hints.isArrowIncludeFid, hints.isArrowProxyFid, hints.getAxisOrder) val ipcOpts = FormatVersion.options(hints.getArrowFormatVersion.getOrElse(FormatVersion.ArrowFormatVersion.get)) val dictionaryFields = hints.getArrowDictionaryFields diff --git a/geomesa-process/geomesa-process-vector/src/main/scala/org/locationtech/geomesa/process/transform/ArrowConversionProcess.scala b/geomesa-process/geomesa-process-vector/src/main/scala/org/locationtech/geomesa/process/transform/ArrowConversionProcess.scala index cc3ed07f1e33..25868cc5ac14 100644 --- a/geomesa-process/geomesa-process-vector/src/main/scala/org/locationtech/geomesa/process/transform/ArrowConversionProcess.scala +++ b/geomesa-process/geomesa-process-vector/src/main/scala/org/locationtech/geomesa/process/transform/ArrowConversionProcess.scala @@ -16,6 +16,7 @@ import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType} import org.geotools.data.simple.SimpleFeatureCollection import org.geotools.feature.visitor._ import org.geotools.process.factory.{DescribeParameter, DescribeProcess, DescribeResult} +import org.geotools.referencing.CRS.AxisOrder import org.locationtech.geomesa.arrow.ArrowProperties import org.locationtech.geomesa.arrow.io.{FormatVersion, SimpleFeatureArrowFileWriter} import org.locationtech.geomesa.arrow.vector.ArrowDictionary @@ -67,7 +68,9 @@ class ArrowConversionProcess extends GeoMesaProcess with LazyLogging { @DescribeParameter(name = "batchSize", description = "Number of features to include in each record batch", min = 0) batchSize: java.lang.Integer, @DescribeParameter(name = "flattenStruct", description = "Removes the outer SFT struct yielding top level feature access", min = 0) - flattenStruct: java.lang.Boolean + flattenStruct: java.lang.Boolean, + @DescribeParameter(name = "axisOrder", description = "Coordinate Reference System (CRS) axis order", min = 0) + axisOrder: AxisOrder ): java.util.Iterator[Array[Byte]] = { import scala.collection.JavaConverters._ @@ -84,7 +87,7 @@ class ArrowConversionProcess extends GeoMesaProcess with LazyLogging { } } - val encoding = SimpleFeatureEncoding.min(includeFids == null || includeFids, proxyFids != null && proxyFids) + val encoding = SimpleFeatureEncoding.min(includeFids == null || includeFids, proxyFids != null && proxyFids, Option(axisOrder)) val ipcVersion = Option(formatVersion).getOrElse(FormatVersion.ArrowFormatVersion.get) val reverse = Option(sortReverse).map(_.booleanValue()) val batch = Option(batchSize).map(_.intValue).getOrElse(ArrowProperties.BatchSize.get.toInt) diff --git a/geomesa-process/geomesa-process-vector/src/test/scala/org/locationtech/geomesa/process/transform/ArrowConversionProcessTest.scala b/geomesa-process/geomesa-process-vector/src/test/scala/org/locationtech/geomesa/process/transform/ArrowConversionProcessTest.scala index 28f7190507cb..f1eb0e8d7c66 100644 --- a/geomesa-process/geomesa-process-vector/src/test/scala/org/locationtech/geomesa/process/transform/ArrowConversionProcessTest.scala +++ b/geomesa-process/geomesa-process-vector/src/test/scala/org/locationtech/geomesa/process/transform/ArrowConversionProcessTest.scala @@ -48,7 +48,7 @@ class ArrowConversionProcessTest extends Specification { "ArrowConversionProcess" should { "encode an empty feature collection" in { - val bytes = process.execute(new ListFeatureCollection(sft), null, null, null, null, null, null, null, null).asScala.reduce(_ ++ _) + val bytes = process.execute(new ListFeatureCollection(sft), null, null, null, null, null, null, null, null, null).asScala.reduce(_ ++ _) WithClose(SimpleFeatureArrowFileReader.streaming(() => new ByteArrayInputStream(bytes))) { reader => reader.sft mustEqual sft SelfClosingIterator(reader.features()) must beEmpty @@ -56,7 +56,7 @@ class ArrowConversionProcessTest extends Specification { } "encode a generic feature collection" in { - val bytes = process.execute(collection, null, null, null, null, null, null, null, null).asScala.reduce(_ ++ _) + val bytes = process.execute(collection, null, null, null, null, null, null, null, null, null).asScala.reduce(_ ++ _) WithClose(SimpleFeatureArrowFileReader.streaming(() => new ByteArrayInputStream(bytes))) { reader => reader.sft mustEqual sft SelfClosingIterator(reader.features()).map(ScalaSimpleFeature.copy).toSeq must @@ -65,7 +65,7 @@ class ArrowConversionProcessTest extends Specification { } "encode a generic feature collection with flattened struct" in { - val bytes = process.execute(collection, null, null, null, null, null, null, null, true).asScala.reduce(_ ++ _) + val bytes = process.execute(collection, null, null, null, null, null, null, null, true, null).asScala.reduce(_ ++ _) val rootAllocator = new RootAllocator() val reader = new ArrowStreamReader(new ByteArrayInputStream(bytes), rootAllocator) @@ -86,12 +86,12 @@ class ArrowConversionProcessTest extends Specification { "encode a generic empty feature collection with dictionary values without leaking memory" in { // This returns an empty iterator. - process.execute(new ListFeatureCollection(sft), null, null, null, Collections.singletonList("name"), null, null, null, null) + process.execute(new ListFeatureCollection(sft), null, null, null, Collections.singletonList("name"), null, null, null, null, null) ArrowAllocator.getAllocatedMemory(sft.getTypeName) must equalTo(0) } "encode a generic feature collection with dictionary values" in { - val bytes = process.execute(collection, null, null, null, Collections.singletonList("name"), null, null, null, null).asScala.reduce(_ ++ _) + val bytes = process.execute(collection, null, null, null, Collections.singletonList("name"), null, null, null, null, null).asScala.reduce(_ ++ _) WithClose(SimpleFeatureArrowFileReader.streaming(() => new ByteArrayInputStream(bytes))) { reader => reader.sft mustEqual sft SelfClosingIterator(reader.features()).map(ScalaSimpleFeature.copy).toSeq must @@ -101,7 +101,7 @@ class ArrowConversionProcessTest extends Specification { } "encode a generic feature collection with dictionary values and flattened struct" in { - val bytes = process.execute(collection, null, null, null, Collections.singletonList("name"), null, null, null, true).asScala.reduce(_ ++ _) + val bytes = process.execute(collection, null, null, null, Collections.singletonList("name"), null, null, null, true, null).asScala.reduce(_ ++ _) val rootAllocator = new RootAllocator() val reader = new ArrowStreamReader(new ByteArrayInputStream(bytes), rootAllocator) @@ -121,12 +121,12 @@ class ArrowConversionProcessTest extends Specification { } "encode a generic feature collection with sorting" in { - val ascending = process.execute(collection, null, null, null, null, "dtg", null, null, null).asScala.reduce(_ ++ _) + val ascending = process.execute(collection, null, null, null, null, "dtg", null, null, null, null).asScala.reduce(_ ++ _) WithClose(SimpleFeatureArrowFileReader.streaming(() => new ByteArrayInputStream(ascending))) { reader => reader.sft mustEqual sft SelfClosingIterator(reader.features()).map(ScalaSimpleFeature.copy).toSeq mustEqual features } - val descending = process.execute(collection, null, null, null, null, "dtg", true, null, null).asScala.reduce(_ ++ _) + val descending = process.execute(collection, null, null, null, null, "dtg", true, null, null, null).asScala.reduce(_ ++ _) WithClose(SimpleFeatureArrowFileReader.streaming(() => new ByteArrayInputStream(descending))) { reader => reader.sft mustEqual sft SelfClosingIterator(reader.features()).map(ScalaSimpleFeature.copy).toSeq mustEqual features.reverse @@ -134,13 +134,13 @@ class ArrowConversionProcessTest extends Specification { } "encode a generic feature collection with sorting and dictionary values" in { - val ascending = process.execute(collection, null, null, null, Collections.singletonList("name"), "dtg", null, null, null).asScala.reduce(_ ++ _) + val ascending = process.execute(collection, null, null, null, Collections.singletonList("name"), "dtg", null, null, null, null).asScala.reduce(_ ++ _) WithClose(SimpleFeatureArrowFileReader.streaming(() => new ByteArrayInputStream(ascending))) { reader => reader.sft mustEqual sft SelfClosingIterator(reader.features()).map(ScalaSimpleFeature.copy).toSeq mustEqual features reader.dictionaries.get("name") must beSome } - val descending = process.execute(collection, null, null, null, Collections.singletonList("name"), "dtg", true, null, null).asScala.reduce(_ ++ _) + val descending = process.execute(collection, null, null, null, Collections.singletonList("name"), "dtg", true, null, null, null).asScala.reduce(_ ++ _) WithClose(SimpleFeatureArrowFileReader.streaming(() => new ByteArrayInputStream(descending))) { reader => reader.sft mustEqual sft SelfClosingIterator(reader.features()).map(ScalaSimpleFeature.copy).toSeq mustEqual features.reverse