Skip to content

Commit

Permalink
GEOMESA-3219 Fix auth check for attribute-level vis in row key (#3163)
Browse files Browse the repository at this point in the history
* Move index value code out of accumulo module
* Remove unnecessary AccumuloWritableFeature
* Make WritableFeature impls private
  • Loading branch information
elahrvivaz authored Sep 9, 2024
1 parent 5e8f12d commit 2e6cbda
Show file tree
Hide file tree
Showing 22 changed files with 322 additions and 352 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ class AccumuloIndexAdapter(ds: AccumuloDataStore) extends TableManager(ds.connec
indices: Seq[GeoMesaFeatureIndex[_, _]],
partition: Option[String],
atomic: Boolean): IndexWriter = {
val wrapper = AccumuloWritableFeature.wrapper(sft, groups, indices)
val wrapper = WritableFeature.wrapper(sft, groups)
(atomic, sft.isVisibilityRequired) match {
case (false, false) => new AccumuloIndexWriter(ds, indices, wrapper, partition)
case (false, true) => new AccumuloIndexWriter(ds, indices, wrapper, partition) with RequiredVisibilityWriter
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ import org.geotools.util.factory.Hints
import org.junit.runner.RunWith
import org.locationtech.geomesa.accumulo.TestWithFeatureType
import org.locationtech.geomesa.features.ScalaSimpleFeature
import org.locationtech.geomesa.index.conf.QueryHints
import org.locationtech.geomesa.index.index.attribute.AttributeIndex
import org.locationtech.geomesa.index.index.id.IdIndex
import org.locationtech.geomesa.index.index.z2.Z2Index
import org.locationtech.geomesa.index.index.z3.Z3Index
import org.locationtech.geomesa.security.SecurityUtils
import org.locationtech.geomesa.utils.collection.SelfClosingIterator
import org.locationtech.geomesa.utils.io.WithClose
import org.specs2.runner.JUnitRunner

@RunWith(classOf[JUnitRunner])
Expand Down Expand Up @@ -68,72 +70,83 @@ class AccumuloDataStoreAttributeVisibilityTest extends TestWithFeatureType {
addFeatures(Seq(userFeature, adminFeature, mixedFeature))
}

def queryByAuths(auths: String, filter: String, expectedStrategy: String): Seq[SimpleFeature] = {
val ds = DataStoreFinder.getDataStore((dsParams ++ Map(AccumuloDataStoreParams.AuthsParam.key -> auths)).asJava).asInstanceOf[AccumuloDataStore]
val query = new Query(sftName, ECQL.toFilter(filter))
val plans = ds.getQueryPlan(query)
forall(plans)(_.filter.index.name mustEqual expectedStrategy)
SelfClosingIterator(ds.getFeatureReader(query, Transaction.AUTO_COMMIT)).toSeq
def queryByAuths(auths: String, filter: String, strategy: String): Seq[SimpleFeature] = {
val params = dsParams ++ Map(AccumuloDataStoreParams.AuthsParam.key -> auths)
WithClose(DataStoreFinder.getDataStore(params.asJava).asInstanceOf[AccumuloDataStore]) { ds =>
val query = new Query(sftName, ECQL.toFilter(filter))
query.getHints.put(QueryHints.QUERY_INDEX, strategy)
val plans = ds.getQueryPlan(query)
forall(plans)(_.filter.index.name mustEqual strategy)
SelfClosingIterator(ds.getFeatureReader(query, Transaction.AUTO_COMMIT)).toList
}
}

val filters = Seq(
("IN ('user', 'admin', 'mixed')", IdIndex.name),
("bbox(geom, -121, 44, -119, 48)", Z2Index.name),
("bbox(geom, -121, 44, -119, 48) AND dtg DURING 2014-01-01T00:00:00.000Z/2014-01-04T00:00:00.000Z", Z3Index.name),
("name = 'name-user' OR name = 'name-admin' OR name = 'name-mixed'", AttributeIndex.name)
VisTestCase("IN ('user', 'admin', 'mixed')", Seq(IdIndex.name), returnMixed = true),
VisTestCase("bbox(geom, -121, 44, -119, 48)", Seq(Z2Index.name), returnMixed = true),
// these two cases filter on a non-visible attribute so shouldn't return the 'mixed' feature
VisTestCase("bbox(geom, -121, 44, -119, 48) AND dtg DURING 2014-01-01T00:00:00.000Z/2014-01-04T00:00:00.000Z", Seq(Z3Index.name), returnMixed = false),
VisTestCase("(name = 'name-user' OR name = 'name-admin' OR name = 'name-mixed')", Seq(AttributeIndex.name, Z3Index.name), returnMixed = false)
)

"AccumuloDataStore" should {
"correctly return all features with appropriate auths" in {
forall(filters) { case (filter, strategy) =>
val features = queryByAuths("admin,user", filter, strategy)
features must haveLength(3)
features must containTheSameElementsAs(Seq(userFeature, adminFeature, mixedFeature))
foreach(filters) { case VisTestCase(filter, strategies, _) =>
foreach(strategies) { strategy =>
val features = queryByAuths("admin,user", filter, strategy)
features must haveLength(3)
features must containTheSameElementsAs(Seq(userFeature, adminFeature, mixedFeature))
}
}
}
"correctly return some features with appropriate auths" in {
forall(filters) { case (filter, strategy) =>
val features = queryByAuths("user", filter, strategy)
features must haveLength(2)
features must contain(userFeature)
val m = features.find(_.getID == "mixed")
m must beSome
m.get.getAttribute(0) must beNull
m.get.getAttribute(1) mustEqual 12
m.get.getAttribute(2) must beNull
m.get.getAttribute(3) mustEqual mixedFeature.getAttribute(3)
foreach(filters) { case VisTestCase(filter, strategies, returnMixed) =>
foreach(strategies) { strategy =>
val features = queryByAuths("user", filter, strategy)
features must contain(userFeature)
if (!returnMixed) {
features must haveLength(1)
} else {
features must haveLength(2)
val m = features.find(_.getID == "mixed")
m must beSome
m.get.getAttribute(0) must beNull
m.get.getAttribute(1) mustEqual 12
m.get.getAttribute(2) must beNull
m.get.getAttribute(3) mustEqual mixedFeature.getAttribute(3)
}
}
}
}

"delete one record" in {
val ds = DataStoreFinder.getDataStore(dsParams.asJava).asInstanceOf[AccumuloDataStore]
val fs: SimpleFeatureStore = ds.getFeatureSource(sftName).asInstanceOf[SimpleFeatureStore]
val fs = ds.getFeatureSource(sftName)

val queryBefore = new Query(sftName, ECQL.toFilter("IN('user')"))
val resultsBefore = SelfClosingIterator(ds.getFeatureReader(queryBefore, Transaction.AUTO_COMMIT)).toSeq
resultsBefore.size mustEqual 1
resultsBefore must haveLength(1)

fs.removeFeatures(ECQL.toFilter("IN('user')"))

val queryAfter = new Query(sftName, ECQL.toFilter("IN('user')"))
val resultsAfter = SelfClosingIterator(ds.getFeatureReader(queryAfter, Transaction.AUTO_COMMIT)).toSeq
resultsAfter.size mustEqual 0
resultsAfter must beEmpty
}

"delete all records" in {
val ds = DataStoreFinder.getDataStore(dsParams.asJava).asInstanceOf[AccumuloDataStore]
val fs: SimpleFeatureStore = ds.getFeatureSource(sftName).asInstanceOf[SimpleFeatureStore]
val fs = ds.getFeatureSource(sftName)

val queryBefore = new Query(sftName, ECQL.toFilter("INCLUDE"))
val resultsBefore = SelfClosingIterator(ds.getFeatureReader(queryBefore, Transaction.AUTO_COMMIT)).toSeq
resultsBefore.size mustEqual 2 // We deleted one record in the prior test
resultsBefore must haveLength(2) // We deleted one record in the prior test

fs.removeFeatures(ECQL.toFilter("INCLUDE"))

val queryAfter = new Query(sftName, ECQL.toFilter("INCLUDE"))
val resultsAfter = SelfClosingIterator(ds.getFeatureReader(queryAfter, Transaction.AUTO_COMMIT)).toSeq
resultsAfter.size mustEqual 0
ok
resultsAfter must beEmpty
}
}

case class VisTestCase(filter: String, indices: Seq[String], returnMixed: Boolean)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@

package org.locationtech.geomesa.accumulo.index

import org.geotools.api.feature.simple.SimpleFeatureType
import org.geotools.api.filter.Filter
import org.locationtech.geomesa.features.kryo.serialization.IndexValueSerializer
import org.locationtech.geomesa.filter.FilterHelper
import org.locationtech.geomesa.index.api._
import org.locationtech.geomesa.index.index.attribute.{AttributeIndex, AttributeIndexKey, AttributeIndexValues}
import org.geotools.api.feature.simple.SimpleFeatureType
import org.geotools.api.filter.Filter

/**
* Mixin trait to add join support to the normal attribute index class
Expand All @@ -27,7 +28,7 @@ trait AttributeJoinIndex extends GeoMesaFeatureIndex[AttributeIndexValues[Any],
private val attributeIndex = sft.indexOf(attribute)
private val descriptor = sft.getDescriptor(attributeIndex)
private val binding = descriptor.getType.getBinding
val indexSft: SimpleFeatureType = IndexValueEncoder.getIndexSft(sft)
val indexSft: SimpleFeatureType = IndexValueSerializer.getIndexSft(sft)

override val name: String = JoinIndex.name
override val identifier: String = GeoMesaFeatureIndex.identifier(name, version, attributes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,116 +8,20 @@

package org.locationtech.geomesa.accumulo.index

import org.geotools.api.feature.`type`.AttributeDescriptor
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.geotools.feature.simple.SimpleFeatureTypeBuilder
import org.geotools.filter.identity.FeatureIdImpl
import org.locationtech.geomesa.features.SerializationOption.{SerializationOption, SerializationOptions}
import org.locationtech.geomesa.features.SimpleFeatureSerializer.LimitedSerialization
import org.locationtech.geomesa.features.kryo.{KryoFeatureSerializer, ProjectingKryoFeatureSerializer}
import org.locationtech.geomesa.features.{ScalaSimpleFeature, SimpleFeatureSerializer}
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes
import org.locationtech.geomesa.utils.geotools.sft.ImmutableSimpleFeatureType

import java.util.concurrent.ConcurrentHashMap
import org.geotools.api.feature.simple.SimpleFeatureType
import org.locationtech.geomesa.features.kryo.serialization.IndexValueSerializer

/**
* Serializer for attribute join indices
*/
object IndexValueEncoder {

import org.locationtech.geomesa.utils.geotools.RichAttributeDescriptors.RichAttributeDescriptor
import org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType

import scala.collection.JavaConverters._

private val cache = new ConcurrentHashMap[ImmutableSimpleFeatureType, ImmutableSimpleFeatureType]()

def apply(sft: SimpleFeatureType): SimpleFeatureSerializer =
new ProjectingKryoFeatureSerializer(sft, getIndexSft(sft), SerializationOptions.withoutId)

/**
* Gets a feature type compatible with the stored index value
*
* @param sft simple feature type
* @return
*/
def getIndexSft(sft: SimpleFeatureType): SimpleFeatureType = {
sft match {
case immutable: ImmutableSimpleFeatureType =>
var indexSft = cache.get(immutable)
if (indexSft == null) {
indexSft = SimpleFeatureTypes.immutable(buildIndexSft(sft)).asInstanceOf[ImmutableSimpleFeatureType]
cache.put(immutable, indexSft)
}
indexSft

case _ => buildIndexSft(sft)
}
}

private def buildIndexSft(sft: SimpleFeatureType): SimpleFeatureType = {
val builder = new SimpleFeatureTypeBuilder()
builder.setNamespaceURI(null: String)
builder.setName(sft.getTypeName + "--index")
builder.setAttributes(getIndexValueAttributes(sft).asJava)
if (sft.getGeometryDescriptor != null) {
builder.setDefaultGeometry(sft.getGeometryDescriptor.getLocalName)
}
builder.setCRS(sft.getCoordinateReferenceSystem)
val indexSft = builder.buildFeatureType()
indexSft.getUserData.putAll(sft.getUserData)
indexSft
}

/**
* Gets the attributes that are stored in the index value
*
* @param sft simple feature type
* @return
*/
private def getIndexValueAttributes(sft: SimpleFeatureType): Seq[AttributeDescriptor] = {
val geom = sft.getGeometryDescriptor
val dtg = sft.getDtgField
val attributes = scala.collection.mutable.Buffer.empty[AttributeDescriptor]
var i = 0
while (i < sft.getAttributeCount) {
val ad = sft.getDescriptor(i)
if (ad == geom || dtg.contains(ad.getLocalName) || ad.isIndexValue()) {
attributes.append(ad)
}
i += 1
}
attributes.toSeq
}
@deprecated("Replaced with org.locationtech.geomesa.features.kryo.serialization.IndexValueSerializer")
object IndexValueEncoder extends IndexValueSerializer {

/**
* Encoder/decoder for index values. Allows customizable fields to be encoded. Not thread-safe.
*
* @param sft simple feature type
*/
@deprecated
class IndexValueEncoderImpl(sft: SimpleFeatureType) extends SimpleFeatureSerializer with LimitedSerialization {

import scala.collection.JavaConverters._

private val indexSft = getIndexSft(sft)
private val encoder = KryoFeatureSerializer(indexSft)
private val reusableFeature = new ScalaSimpleFeature(indexSft, "")
private val indices = indexSft.getAttributeDescriptors.asScala.map(ad => sft.indexOf(ad.getLocalName)).toArray

override val options: Set[SerializationOption] = Set.empty

override def serialize(sf: SimpleFeature): Array[Byte] = {
reusableFeature.getIdentifier.asInstanceOf[FeatureIdImpl].setID(sf.getID)
var i = 0
while (i < indices.length) {
reusableFeature.setAttribute(i, sf.getAttribute(indices(i)))
i += 1
}
encoder.serialize(reusableFeature)
}

override def deserialize(value: Array[Byte]): SimpleFeature = encoder.deserialize(value)
}
class IndexValueEncoderImpl(sft: SimpleFeatureType) extends IndexValueSerializer.IndexValueEncoderImpl(sft)
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ class JoinIndex(ds: GeoMesaDataStore[_],
tier: Array[Byte],
id: Array[Byte],
lenient: Boolean): RowKeyValue[AttributeIndexKey] = {
val kv = super.toIndexKey(writable, tier, id, lenient)
kv.copy(values = writable.asInstanceOf[ReducedIndexValues].indexValues)
super.toIndexKey(writable, tier, id, lenient).copy(writable.reducedValues)
}
}
}
Expand Down
Loading

0 comments on commit 2e6cbda

Please sign in to comment.