Skip to content

Commit

Permalink
Arrow: introduce an axis order parameter
Browse files Browse the repository at this point in the history
Geomesa currently defaults to always reading and writing Arrow encoding as NORTH_EAST, with the new axis order parameter allow the user to request the encoding to be explicitly specified per request.

Axis Order parameter supported values:
- NORTH_EAST translates to: North = Latitude = Y Axis, East = Longitude = X Axis
- EAST_NORTH translates to: East = Longitude = X Axis, North = Latitude = Y Axis

The request parameter is optional and if not specified axis order defaults to the existing behavior which is NORTH_EAST.
  • Loading branch information
epyatkevich committed Oct 28, 2024
1 parent b7f5d62 commit 37d4ead
Show file tree
Hide file tree
Showing 15 changed files with 159 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,23 @@ class ArrowConversionProcessTest extends TestWithFeatureType {

"ArrowConversionProcess" should {
"encode an empty feature collection" in {
val bytes = process.execute(new ListFeatureCollection(sft), null, null, null, null, null, null, null, null).asScala.reduce(_ ++ _)
val bytes = process.execute(new ListFeatureCollection(sft), null, null, null, null, null, null, null, null, null).asScala.reduce(_ ++ _)
WithClose(SimpleFeatureArrowFileReader.streaming(() => new ByteArrayInputStream(bytes))) { reader =>
reader.sft mustEqual sft
SelfClosingIterator(reader.features()) must beEmpty
}
}

"encode an empty accumulo feature collection" in {
val bytes = process.execute(fs.getFeatures(ECQL.toFilter("bbox(geom,20,20,30,30)")), null, null, null, null, null, null, null, null).asScala.reduce(_ ++ _)
val bytes = process.execute(fs.getFeatures(ECQL.toFilter("bbox(geom,20,20,30,30)")), null, null, null, null, null, null, null, null, null).asScala.reduce(_ ++ _)
WithClose(SimpleFeatureArrowFileReader.streaming(() => new ByteArrayInputStream(bytes))) { reader =>
reader.sft mustEqual sft
SelfClosingIterator(reader.features()) must beEmpty
}
}

"encode an accumulo feature collection in distributed fashion" in {
val bytes = process.execute(fs.getFeatures(Filter.INCLUDE), null, null, null, null, null, null, null, null).asScala.reduce(_ ++ _)
val bytes = process.execute(fs.getFeatures(Filter.INCLUDE), null, null, null, null, null, null, null, null, null).asScala.reduce(_ ++ _)
WithClose(SimpleFeatureArrowFileReader.streaming(() => new ByteArrayInputStream(bytes))) { reader =>
reader.sft mustEqual sft
SelfClosingIterator(reader.features()).map(ScalaSimpleFeature.copy).toSeq must
Expand All @@ -70,7 +70,7 @@ class ArrowConversionProcessTest extends TestWithFeatureType {

"encode an accumulo feature collection in distributed fashion with calculated dictionary values" in {
val filter = ECQL.toFilter("name = 'name0'")
val bytes = process.execute(fs.getFeatures(filter), null, null, null, Collections.singletonList("name"), null, null, null, null).asScala.reduce(_ ++ _)
val bytes = process.execute(fs.getFeatures(filter), null, null, null, Collections.singletonList("name"), null, null, null, null, null).asScala.reduce(_ ++ _)
WithClose(SimpleFeatureArrowFileReader.streaming(() => new ByteArrayInputStream(bytes))) { reader =>
reader.sft mustEqual sft
SelfClosingIterator(reader.features()).map(ScalaSimpleFeature.copy).toSeq must
Expand All @@ -81,15 +81,15 @@ class ArrowConversionProcessTest extends TestWithFeatureType {
}

"sort and encode an accumulo feature collection in distributed fashion" in {
val bytes = process.execute(fs.getFeatures(Filter.INCLUDE), null, null, null, null, "dtg", null, null, null).asScala.reduce(_ ++ _)
val bytes = process.execute(fs.getFeatures(Filter.INCLUDE), null, null, null, null, "dtg", null, null, null, null).asScala.reduce(_ ++ _)
WithClose(SimpleFeatureArrowFileReader.streaming(() => new ByteArrayInputStream(bytes))) { reader =>
reader.sft mustEqual sft
SelfClosingIterator(reader.features()).map(ScalaSimpleFeature.copy).toList mustEqual features
}
}

"reverse sort and encode an accumulo feature collection in distributed fashion" in {
val bytes = process.execute(fs.getFeatures(Filter.INCLUDE), null, null, null, null, "dtg", Boolean.box(true), null, null).asScala.reduce(_ ++ _)
val bytes = process.execute(fs.getFeatures(Filter.INCLUDE), null, null, null, null, "dtg", Boolean.box(true), null, null, null).asScala.reduce(_ ++ _)
WithClose(SimpleFeatureArrowFileReader.streaming(() => new ByteArrayInputStream(bytes))) { reader =>
reader.sft mustEqual sft
SelfClosingIterator(reader.features()).map(ScalaSimpleFeature.copy).toList mustEqual features.reverse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import org.apache.arrow.vector.types.Types.MinorType
import org.apache.arrow.vector.types.pojo.{ArrowType, FieldType}
import org.geotools.api.feature.`type`.AttributeDescriptor
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.geotools.referencing.CRS.AxisOrder
import org.locationtech.geomesa.arrow.jts._
import org.locationtech.geomesa.arrow.vector.SimpleFeatureVector.SimpleFeatureEncoding
import org.locationtech.geomesa.arrow.vector.SimpleFeatureVector.SimpleFeatureEncoding.Encoding
Expand Down Expand Up @@ -192,7 +193,7 @@ object ArrowAttributeWriter {
case ObjectType.LONG => new ArrowLongWriter(name, metadata, factory)
case ObjectType.FLOAT => new ArrowFloatWriter(name, metadata, factory)
case ObjectType.DOUBLE => new ArrowDoubleWriter(name, metadata, factory)
case ObjectType.GEOMETRY => geometry(name, bindings(1), encoding.geometry, metadata, factory)
case ObjectType.GEOMETRY => geometry(name, bindings(1), encoding, metadata, factory)
case ObjectType.BOOLEAN => new ArrowBooleanWriter(name, metadata, factory)
case ObjectType.LIST => new ArrowListWriter(name, bindings(1), encoding, metadata, factory)
case ObjectType.MAP => new ArrowMapWriter(name, bindings(1), bindings(2), encoding, metadata, factory)
Expand Down Expand Up @@ -234,11 +235,11 @@ object ArrowAttributeWriter {
private def geometry(
name: String,
binding: ObjectType,
encoding: Encoding,
encoding: SimpleFeatureEncoding,
metadata: Map[String, String],
factory: VectorFactory): ArrowGeometryWriter = {
val m = metadata.asJava
val vector = (binding, encoding, factory) match {
val vector = (binding, encoding.geometry, factory) match {
case (ObjectType.POINT, Encoding.Min, FromStruct(c)) => new PointFloatVector(name, c, m)
case (ObjectType.POINT, Encoding.Min, FromAllocator(c)) => new PointFloatVector(name, c, m)
case (ObjectType.POINT, Encoding.Max, FromStruct(c)) => new PointVector(name, c, m)
Expand Down Expand Up @@ -269,7 +270,7 @@ object ArrowAttributeWriter {
case (_, _, FromList(_)) => throw new NotImplementedError("Geometry lists are not supported")
case _ => throw new IllegalArgumentException(s"Unexpected geometry type $binding")
}
new ArrowGeometryWriter(name, vector.asInstanceOf[GeometryVector[Geometry, FieldVector]])
new ArrowGeometryWriter(name, vector.asInstanceOf[GeometryVector[Geometry, FieldVector]], encoding)
}

trait ArrowDictionaryWriter extends ArrowAttributeWriter {
Expand Down Expand Up @@ -464,13 +465,21 @@ object ArrowAttributeWriter {
/**
* Writes geometries - delegates to our JTS geometry vectors
*/
class ArrowGeometryWriter(val name: String, delegate: GeometryVector[Geometry, FieldVector])
class ArrowGeometryWriter(val name: String, delegate: GeometryVector[Geometry, FieldVector], encoding: SimpleFeatureEncoding)
extends ArrowAttributeWriter {

override def vector: FieldVector = delegate.getVector

// note: delegate handles nulls
override def apply(i: Int, value: AnyRef): Unit = delegate.set(i, value.asInstanceOf[Geometry])
override def apply(i: Int, value: AnyRef): Unit = {
val options = Map("axisOrder" -> (encoding.axisOrder match {
case Encoding.Max => AxisOrder.EAST_NORTH /* Lon/Lat */
case _ => AxisOrder.NORTH_EAST /* Lat/Lon */
})).asInstanceOf[Map[String, AnyRef]].asJava

delegate.setOptions(options)
delegate.set(i, value.asInstanceOf[Geometry])
}

override def setValueCount(count: Int): Unit = delegate.setValueCount(count)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import org.apache.arrow.vector.types.FloatingPointPrecision
import org.apache.arrow.vector.types.pojo.{ArrowType, FieldType}
import org.apache.arrow.vector.{BigIntVector, FieldVector}
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.geotools.referencing.CRS
import org.geotools.referencing.CRS.AxisOrder
import org.geotools.util.factory.Hints
import org.locationtech.geomesa.arrow.ArrowAllocator
import org.locationtech.geomesa.arrow.features.ArrowSimpleFeature
import org.locationtech.geomesa.arrow.jts.GeometryFields
Expand Down Expand Up @@ -132,16 +135,20 @@ object SimpleFeatureVector {
val DescriptorKey = "descriptor"
val OptionsKey = "options"

case class SimpleFeatureEncoding(fids: Option[Encoding], geometry: Encoding, date: Encoding)
case class SimpleFeatureEncoding(fids: Option[Encoding], geometry: Encoding, date: Encoding, axisOrder: Encoding)

object SimpleFeatureEncoding {

val Min: SimpleFeatureEncoding = SimpleFeatureEncoding(Some(Encoding.Min), Encoding.Min, Encoding.Min)
val Max: SimpleFeatureEncoding = SimpleFeatureEncoding(Some(Encoding.Max), Encoding.Max, Encoding.Max)
val Min: SimpleFeatureEncoding = SimpleFeatureEncoding(Some(Encoding.Min), Encoding.Min, Encoding.Min, Encoding.Min)
val Max: SimpleFeatureEncoding = SimpleFeatureEncoding(Some(Encoding.Max), Encoding.Max, Encoding.Max, Encoding.Max)

def min(includeFids: Boolean, proxyFids: Boolean = false): SimpleFeatureEncoding = {
def min(includeFids: Boolean, proxyFids: Boolean = false, axisOrder: Option[AxisOrder] = None): SimpleFeatureEncoding = {
val fids = if (includeFids) { Some(if (proxyFids) { Encoding.Min } else { Encoding.Max }) } else { None }
SimpleFeatureEncoding(fids, Encoding.Min, Encoding.Min)
val axisOrderEncoded = axisOrder match {
case Some(AxisOrder.EAST_NORTH) => Encoding.Max /* Lon/Lat */
case _ => Encoding.Min /* Lat/Lon */
}
SimpleFeatureEncoding(fids, Encoding.Min, Encoding.Min, axisOrderEncoded)
}

object Encoding extends Enumeration {
Expand Down Expand Up @@ -245,7 +252,18 @@ object SimpleFeatureVector {
val isLong = dateVector.exists(_.isInstanceOf[BigIntVector])
if (isLong) { Encoding.Max } else { Encoding.Min }
}
val encoding = SimpleFeatureEncoding(fidEncoding, geomPrecision, datePrecision)
val axisOrderPrecision = {
if (java.lang.Boolean.getBoolean("org.geotools.referencing.forceXY") ||
Hints.getSystemDefault(Hints.FORCE_LONGITUDE_FIRST_AXIS_ORDER) == java.lang.Boolean.TRUE) {
Encoding.Max /* Lon/Lat */
} else {
CRS.getAxisOrder(sft.getCoordinateReferenceSystem) match {
case AxisOrder.EAST_NORTH => Encoding.Max /* Lon/Lat */
case _ => Encoding.Min /* Lat/Lon */
}
}
}
val encoding = SimpleFeatureEncoding(fidEncoding, geomPrecision, datePrecision, axisOrderPrecision)

(sft, encoding)
}
Expand Down
4 changes: 4 additions & 0 deletions geomesa-arrow/geomesa-arrow-jts/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
<groupId>org.locationtech.jts</groupId>
<artifactId>jts-core</artifactId>
</dependency>
<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-referencing</artifactId>
</dependency>

<!-- not used, but we can't exclude direct inherited dependencies, so just mark it provided -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.geom.GeometryFactory;

import java.util.Map;

/**
* Complex vector for geometries
*
Expand All @@ -32,4 +34,7 @@ public interface GeometryVector<T extends Geometry, V extends FieldVector> exten
int getNullCount();

void transfer(int fromIndex, int toIndex, GeometryVector<T, V> to);

Map<String, Object> getOptions();
void setOptions(Map<String, Object> map);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@
import org.locationtech.jts.io.WKBWriter;

import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;

/**
* Catch-all for storing instances of Geometry as WKB
*/
public class WKBGeometryVector implements GeometryVector<Geometry, VarBinaryVector> {
private VarBinaryVector vector;
private final VarBinaryVector vector;
private WKBWriter writer = null;
private WKBReader reader = null;
protected final Map<String, Object> options = new HashMap<>();

public static final Field field = Field.nullablePrimitive("wkb", ArrowType.Binary.INSTANCE);

Expand Down Expand Up @@ -108,4 +110,15 @@ public void transfer(int fromIndex, int toIndex, GeometryVector<Geometry, VarBin
public void close() throws Exception {
vector.close();
}

@Override
public Map<String, Object> getOptions() {
return options;
}

@Override
public void setOptions(Map<String, Object> map) {
options.clear();
options.putAll(map);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,39 @@

import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.types.pojo.Field;
import org.geotools.referencing.CRS;
import org.locationtech.geomesa.arrow.jts.GeometryVector;
import org.locationtech.jts.geom.Geometry;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public abstract class AbstractGeometryVector<T extends Geometry, U extends FieldVector, V extends FieldVector>
implements GeometryVector<T, U> {

private V ordinal;
protected U vector;
protected final Map<String, Object> options;

protected AbstractGeometryVector(U vector) {
this.vector = vector;
this.options = new HashMap<>();
}

@Override
public Map<String, Object> getOptions() {
return options;
}

@Override
public void setOptions(Map<String, Object> map) {
options.clear();
options.putAll(map);
}

protected boolean swapAxisOrder() {
return getOptions().get("axisOrder") == CRS.AxisOrder.EAST_NORTH;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,13 @@ public void set(int index, Point geom) {
vector.setNull(index);
} else {
vector.setNotNull(index);
writeOrdinal(index * 2, geom.getY());
writeOrdinal(index * 2 + 1, geom.getX());
if (swapAxisOrder()) {
writeOrdinal(y(index), geom.getX());
writeOrdinal(x(index), geom.getY());
} else {
writeOrdinal(y(index), geom.getY());
writeOrdinal(x(index), geom.getX());
}
}
}

Expand All @@ -63,8 +68,14 @@ public Point get(int index) {
if (vector.isNull(index)) {
return null;
} else {
final double y = readOrdinal(index * 2);
final double x = readOrdinal(index * 2 + 1);
final double y, x;
if (swapAxisOrder()) {
y = x(index);
x = y(index);
} else {
y = y(index);
x = x(index);
}
return factory.createPoint(new Coordinate(x, y));
}
}
Expand All @@ -76,8 +87,13 @@ public void transfer(int fromIndex, int toIndex, GeometryVector<Point, FixedSize
((FixedSizeListVector) typed.vector).setNull(toIndex);
} else {
((FixedSizeListVector) typed.vector).setNotNull(toIndex);
typed.writeOrdinal(toIndex * 2, readOrdinal(fromIndex * 2));
typed.writeOrdinal(toIndex * 2 + 1, readOrdinal(fromIndex * 2 + 1));
if (swapAxisOrder()) {
typed.writeOrdinal(y(toIndex), getCoordinateX(fromIndex));
typed.writeOrdinal(x(toIndex), getCoordinateY(fromIndex));
} else {
typed.writeOrdinal(y(toIndex), getCoordinateY(fromIndex));
typed.writeOrdinal(x(toIndex), getCoordinateX(fromIndex));
}
}
}

Expand All @@ -88,7 +104,7 @@ public void transfer(int fromIndex, int toIndex, GeometryVector<Point, FixedSize
* @return y ordinate
*/
public double getCoordinateY(int index) {
return readOrdinal(index * 2);
return readOrdinal(y(index));

}

Expand All @@ -99,6 +115,20 @@ public double getCoordinateY(int index) {
* @return x ordinate
*/
public double getCoordinateX(int index) {
return readOrdinal(index * 2 + 1);
return readOrdinal(x(index));
}

/**
* Calculate the Y index
*/
protected int y(int index) {
return index * 2;
}

/**
* Calculate the X index
*/
protected int x(int index) {
return index * 2 + 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class ArrowExporter(out: OutputStream, hints: Hints) extends FeatureExporter {
import org.locationtech.geomesa.index.conf.QueryHints.RichHints

private lazy val sort = hints.getArrowSort
private lazy val encoding = SimpleFeatureEncoding.min(hints.isArrowIncludeFid, hints.isArrowProxyFid)
private lazy val encoding = SimpleFeatureEncoding.min(hints.isArrowIncludeFid, hints.isArrowProxyFid, hints.getAxisOrder)
private lazy val ipc = hints.getArrowFormatVersion.getOrElse(FormatVersion.ArrowFormatVersion.get)
private lazy val batchSize = hints.getArrowBatchSize.getOrElse(ArrowProperties.BatchSize.get.toInt)
private lazy val dictionaryFields = hints.getArrowDictionaryFields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.geotools.api.feature.simple.SimpleFeatureType
import org.geotools.api.filter.sort.{SortBy, SortOrder}
import org.geotools.geometry.jts.ReferencedEnvelope
import org.geotools.referencing.CRS
import org.geotools.referencing.CRS.AxisOrder
import org.geotools.util.factory.Hints
import org.geotools.util.factory.Hints.{ClassKey, IntegerKey}
import org.locationtech.geomesa.index.conf.FilterCompatibility.FilterCompatibility
Expand Down Expand Up @@ -67,6 +68,8 @@ object QueryHints {

val FILTER_COMPAT = new ClassKey(classOf[java.lang.String])

val AXIS_ORDER = new ClassKey(classOf[AxisOrder])

def sortReadableString(sort: Seq[(String, Boolean)]): String =
sort.map { case (f, r) => s"$f ${if (r) "DESC" else "ASC" }"}.mkString(", ")

Expand Down Expand Up @@ -178,5 +181,7 @@ object QueryHints {
}
}
}

def getAxisOrder: Option[AxisOrder] = Option(hints.get(AXIS_ORDER).asInstanceOf[AxisOrder])
}
}
Loading

0 comments on commit 37d4ead

Please sign in to comment.