diff --git a/docs/user/accumulo/commandline.rst b/docs/user/accumulo/commandline.rst index 1b5d03eeb7bb..72d4e3b6c9fc 100644 --- a/docs/user/accumulo/commandline.rst +++ b/docs/user/accumulo/commandline.rst @@ -98,7 +98,7 @@ in the Accumulo documentation for additional details. .. warning:: - The two feature types must be identical, and both must be partitioned (see :ref:`partitioned_indices`). + The two feature types must be identical. ========================== ================================================================================================== Argument Description @@ -120,18 +120,19 @@ Argument Description ``-f, --feature-name *`` The name of the schema to copy ``--export-path *`` HDFS path to used for file export - the scheme and authority (e.g. bucket name) must match the destination table filesystem -``--partition`` Partition(s) to copy -``--partition-value`` Value(s) used to indicate partitions to copy (e.g. ``2024-01-01T00:00:00.000Z``) +``--partition`` Partition(s) to copy (if schema is partitioned) +``--partition-value`` Value(s) used to indicate partitions to copy (e.g. ``2024-01-01T00:00:00.000Z``) (if schema is + partitioned) ``-t, --threads`` Number of index tables to copy concurrently, default 1 ``--file-threads`` Number of files to copy concurrently, per table, default 2 ``--distcp`` Use Hadoop DistCp to move files from one cluster to the other, instead of normal file copies +``--resume`` Resume a previously interrupted run from where it left off ========================== ================================================================================================== .. note:: - At least one ``--partition`` or ``--partition-value`` must be specified. - -``--partition`` and/or ``--partition-value`` may be specified multiple times in order to copy multiple partitions. + ``--partition`` and/or ``--partition-value`` may be specified multiple times in order to copy multiple partitions, or omitted + to copy all existing partitions. ``bulk-ingest`` ^^^^^^^^^^^^^^^ diff --git a/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/ingest/AccumuloBulkCopyCommand.scala b/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/ingest/AccumuloBulkCopyCommand.scala index 28246bc9be4a..8f36d2d1c1a9 100644 --- a/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/ingest/AccumuloBulkCopyCommand.scala +++ b/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/ingest/AccumuloBulkCopyCommand.scala @@ -29,10 +29,11 @@ import org.locationtech.geomesa.utils.concurrent.CachedThreadPool import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes import org.locationtech.geomesa.utils.io.{CloseWithLogging, WithClose} -import java.io.{Closeable, File} +import java.io.{Closeable, File, IOException} import java.nio.charset.StandardCharsets import java.util.Collections -import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicBoolean +import scala.collection.mutable.ListBuffer import scala.util.Try import scala.util.control.NonFatal @@ -51,72 +52,6 @@ object AccumuloBulkCopyCommand extends LazyLogging { import scala.collection.JavaConverters._ - @Parameters(commandDescription = "Bulk copy RFiles to a different cluster") - class AccumuloBulkCopyParams extends RequiredTypeNameParam { - - @Parameter(names = Array("--from-instance"), description = "Source Accumulo instance name", required = true) - var fromInstance: String = _ - @Parameter(names = Array("--from-zookeepers"), description = "Zookeepers for the source instance (host[:port], comma separated)", required = true) - var fromZookeepers: String = _ - @Parameter(names = Array("--from-user"), description = "User name for the source instance", required = true) - var fromUser: String = _ - @Parameter(names = Array("--from-keytab"), description = "Path to Kerberos keytab file for the source instance") - var fromKeytab: String = _ - @Parameter(names = Array("--from-password"), description = "Connection password for the source instance") - var fromPassword: String = _ - @Parameter(names = Array("--from-catalog"), description = "Catalog table containing the source feature type", required = true) - var fromCatalog: String = _ - @Parameter(names = Array("--from-config"), description = "Additional Hadoop configuration file(s) to use for the source instance") - var fromConfigs: java.util.List[File] = _ - - @Parameter(names = Array("--to-instance"), description = "Destination Accumulo instance name", required = true) - var toInstance: String = _ - @Parameter(names = Array("--to-zookeepers"), description = "Zookeepers for the destination instance (host[:port], comma separated)", required = true) - var toZookeepers: String = _ - @Parameter(names = Array("--to-user"), description = "User name for the destination instance", required = true) - var toUser: String = _ - @Parameter(names = Array("--to-keytab"), description = "Path to Kerberos keytab file for the destination instance") - var toKeytab: String = _ - @Parameter(names = Array("--to-password"), description = "Connection password for the destination instance") - var toPassword: String = _ - @Parameter(names = Array("--to-catalog"), description = "Catalog table containing the destination feature type", required = true) - var toCatalog: String = _ - @Parameter(names = Array("--to-config"), description = "Additional Hadoop configuration file(s) to use for the destination instance") - var toConfigs: java.util.List[File] = _ - - @Parameter( - names = Array("--export-path"), - description = "HDFS path to use for source table export - the scheme and authority (e.g. bucket name) must match the destination table filesystem", - required = true) - var exportPath: String = _ - - @Parameter(names = Array("--partition"), description = "Partition(s) to copy") - var partitions: java.util.List[String] = _ - - @Parameter(names = Array("--partition-value"), description = "Value(s) used to indicate partitions to copy (e.g. dates)") - var partitionValues: java.util.List[String] = _ - - @Parameter(names = Array("--index"), description = "Specific index(es) to copy, instead of all indices") - var indices: java.util.List[String] = _ - - @Parameter( - names = Array("-t", "--threads"), - description = "Number of index tables to copy concurrently", - validateWith = Array(classOf[PositiveInteger])) - var tableThreads: java.lang.Integer = 1 - - @Parameter( - names = Array("--file-threads"), - description = "Number of files to copy concurrently, per table", - validateWith = Array(classOf[PositiveInteger])) - var fileThreads: java.lang.Integer = 2 - - @Parameter( - names = Array("--distcp"), - description = "Use Hadoop DistCp to move files from one cluster to the other, instead of normal file copies") - var distCp: Boolean = false - } - /** * Encapsulates the logic for the bulk copy operation * @@ -155,24 +90,6 @@ object AccumuloBulkCopyCommand extends LazyLogging { // sft should be shareable/the same from both datastores lazy private val sft: SimpleFeatureType = from.ds.getSchema(params.featureName) - lazy private val partitions: Seq[String] = { - val builder = Seq.newBuilder[String] - if (params.partitions != null) { - builder ++= params.partitions.asScala - } - if (params.partitionValues != null && !params.partitionValues.isEmpty) { - val partitioning = TablePartition(from.ds, sft).get - val sf = new ScalaSimpleFeature(sft, "") - builder ++= params.partitionValues.asScala.map { value => - sf.setAttribute(sft.getDtgIndex.get, value) - val partition = partitioning.partition(sf) - logger.debug(s"Generated partition $partition from value $value") - partition - } - } - builder.result.distinct.sorted - } - lazy private val indices = { val all = from.ds.manager.indices(sft) if (params.indices == null || params.indices.isEmpty) { all } else { @@ -190,6 +107,41 @@ object AccumuloBulkCopyCommand extends LazyLogging { } } + // these get passed into our index method calls - for partitioned schemas, it must be a Seq[Some[_]], + // while for non-partitioned schemas it must always be Seq(None) + lazy private val partitions: Seq[Option[String]] = { + if (sft.isPartitioned) { + val builder = ListBuffer.empty[String] + if (params.partitions != null && !params.partitions.isEmpty) { + builder ++= params.partitions.asScala + } + if (params.partitionValues != null && !params.partitionValues.isEmpty) { + val partitioning = TablePartition(from.ds, sft).get + val sf = new ScalaSimpleFeature(sft, "") + builder ++= params.partitionValues.asScala.map { value => + sf.setAttribute(sft.getDtgIndex.get, value) + val partition = partitioning.partition(sf) + logger.debug(s"Generated partition $partition from value $value") + partition + } + } + if (builder.isEmpty) { + logger.debug("No partitions specified - loading all partitions from store") + indices.foreach { index => + builder ++= index.getPartitions + } + } + builder.result.distinct.sorted.map(Option.apply) + } else { + if ((params.partitions != null && !params.partitions.isEmpty) || + (params.partitionValues != null && !params.partitionValues.isEmpty)) { + throw new ParameterException("--partition and/or --partition-value are not applicable for a non-partitioned schema") + } + Seq(None) + } + } + + override def run(): Unit = { // validate our params/setup if (exportPath.toUri.getScheme == "file") { @@ -197,30 +149,35 @@ object AccumuloBulkCopyCommand extends LazyLogging { } if (sft == null) { throw new ParameterException(s"Schema '${params.featureName}' does not exist in the source store") - } else if (!sft.isPartitioned) { - throw new ParameterException(s"Schema '${params.featureName}' is not partitioned") } else { val toSft = to.ds.getSchema(params.featureName) if (toSft == null) { throw new ParameterException(s"Schema '${params.featureName}' does not exist in the destination store") - } else if (!toSft.isPartitioned) { - throw new ParameterException(s"Schema '${params.featureName}' is not partitioned in the destination store") } else if (SimpleFeatureTypes.compare(sft, toSft) != 0) { throw new ParameterException(s"Schema '${params.featureName}' is not the same in the source and destination store") + } else if (SimpleFeatureTypes.compareIndexConfigs(sft, toSft) != 0) { + throw new ParameterException( + s"Schema '${params.featureName}' does not have compatible indices in the source and destination store") } } - if (partitions.isEmpty) { - throw new ParameterException("At least one of --partition or --partition-value is required") - } // now execute the copy CachedThreadPool.executor(params.tableThreads) { executor => partitions.foreach { partition => indices.map { fromIndex => val toIndex = to.ds.manager.index(sft, fromIndex.identifier) - val runnable: Runnable = () => try { copy(fromIndex, toIndex, partition) } catch { - // catch Throwable so NoClassDefFound still gets logged - case e: Throwable => logger.error(s"Error copying partition $partition ${fromIndex.identifier}", e) + val partitionLogId = s"${partition.fold(s"index")(p => s"partition $p")} ${fromIndex.identifier}" + val runnable: Runnable = () => { + try { + Command.user.info(s"Copying $partitionLogId") + copy(fromIndex, toIndex, partition, partitionLogId) + Command.user.info(s"Bulk copy complete for $partitionLogId") + } catch { + // catch Throwable so NoClassDefFound still gets logged + case e: Throwable => + Command.user.error(s"Error copying $partitionLogId: ${e.getMessage}") + logger.error(s"Error copying $partitionLogId", e) + } } executor.submit(runnable) } @@ -228,53 +185,87 @@ object AccumuloBulkCopyCommand extends LazyLogging { } } - private def copy(fromIndex: GeoMesaFeatureIndex[_, _], toIndex: GeoMesaFeatureIndex[_, _], partition: String): Unit = { - Command.user.info(s"Copying partition $partition ${fromIndex.identifier}") + /** + * Copy a single index + partition + * + * @param fromIndex from index + * @param toIndex to index + * @param partition partition name - must be Some if schema is partitioned + * @param partitionLogId identifier for log messages + */ + private def copy( + fromIndex: GeoMesaFeatureIndex[_, _], + toIndex: GeoMesaFeatureIndex[_, _], + partition: Option[String], + partitionLogId: String): Unit = { + require(sft.isPartitioned == partition.isDefined) // sanity check - this should always be true due to our setup + + val fromTable = try { fromIndex.getTableName(partition) } catch { + case NonFatal(e) => throw new RuntimeException("Could not get source table", e) + } - val fromTable = fromIndex.getTableNames(Some(partition)).headOption.orNull - if (fromTable == null) { - Command.user.warn(s"Ignoring non-existent partition ${fromIndex.identifier} $partition") - return + val completeMarker = new Path(exportPath, s"$fromTable.complete") + if (exportFs.exists(completeMarker)) { + if (params.resume) { + logger.debug("Skipping already completed copy") + return + } else { + exportFs.delete(completeMarker, false) + } } val tableExportPath = new Path(exportPath, fromTable) + val distcpPath = new Path(tableExportPath, "distcp.txt") + val copyToDir = new Path(tableExportPath, "files") + val cloneTable = s"${fromTable}_bc_tmp" + logger.debug(s"Source table $fromTable (${from.tableOps.tableIdMap().get(fromTable)})") logger.debug(s"Export path $tableExportPath") - if (exportFs.exists(tableExportPath)) { - logger.debug(s"Deleting existing export directory $tableExportPath") - exportFs.delete(tableExportPath, true) + + if (params.resume && from.tableOps.exists(cloneTable)) { + logger.debug(s"Using existing cloned table $cloneTable - ensuring table is offline") + from.tableOps.offline(cloneTable, true) + } else { + // clone the table as we have to take it offline in order to export it + // note that cloning is just a metadata op as it shares the underlying data files (until they change) + logger.debug(s"Checking for existence and deleting any existing cloned table $cloneTable") + from.ds.adapter.deleteTable(cloneTable) // no-op if table doesn't exist + logger.debug(s"Cloning $fromTable to $cloneTable") + from.tableOps.clone(fromTable, cloneTable, false, Collections.emptyMap(), Collections.emptySet()) // use 2.0 method for compatibility + logger.debug(s"Taking $cloneTable offline") + from.tableOps.offline(cloneTable, true) } - // clone the table as we have to take it offline in order to export it - // note that cloning is just a metadata op as it shares the underlying data files (until they change) - val cloneTable = s"${fromTable}_tmp" - logger.debug(s"Checking for existence and deleting any existing cloned table $cloneTable") - from.ds.adapter.deleteTable(cloneTable) // no-op if table doesn't exist - logger.debug(s"Cloning $fromTable to $cloneTable") - from.tableOps.clone(fromTable, cloneTable, false, Collections.emptyMap(), Collections.emptySet()) // use 2.0 method for compatibility - logger.debug(s"Taking offline $cloneTable") - from.tableOps.offline(cloneTable, true) - logger.debug(s"Exporting table to $tableExportPath") - from.tableOps.exportTable(cloneTable, tableExportPath.toString) - val distcpPath = new Path(tableExportPath, "distcp.txt") - if (!exportFs.exists(distcpPath)) { - throw new RuntimeException(s"Could not read table export results: $distcpPath") + if (params.resume && exportFs.exists(distcpPath) && exportFs.getFileStatus(distcpPath).getLen > 0) { + logger.debug(s"Using existing export results $distcpPath") + } else { + if (exportFs.exists(tableExportPath)) { + logger.debug(s"Deleting existing export directory $tableExportPath") + exportFs.delete(tableExportPath, true) + } + logger.debug(s"Exporting table to $tableExportPath") + from.tableOps.exportTable(cloneTable, tableExportPath.toString) + + if (!exportFs.exists(distcpPath) || exportFs.getFileStatus(distcpPath).getLen == 0) { + throw new RuntimeException(s"Could not read table export results at $distcpPath") + } } // ensures the destination table exists - logger.debug(s"Checking destination table for $fromTable") - to.ds.adapter.createTable(toIndex, Some(partition), Seq.empty) - val toTable = toIndex.getTableNames(Some(partition)).headOption.getOrElse { - throw new RuntimeException(s"Could not get destination table for index ${fromIndex.identifier} and partition $partition") + logger.debug(s"Checking destination for table $fromTable") + to.ds.adapter.createTable(toIndex, partition, Seq.empty) + val toTable = try { toIndex.getTableName(partition) } catch { + case NonFatal(e) => throw new RuntimeException("Could not get destination table", e) } logger.debug(s"Destination table $toTable (${to.tableOps.tableIdMap().get(toTable)})") + // create splits, do this separately in case the table already exists val splits = new java.util.TreeSet(from.tableOps.listSplits(cloneTable)) val existingSplits = to.tableOps.listSplits(toTable) splits.removeAll(existingSplits) if (!splits.isEmpty) { if (!existingSplits.isEmpty) { - val warning = s"Detected split mismatch between source ($fromTable) and destination ($toTable)" + val warning = s"Detected split mismatch between source ($fromTable) and destination ($toTable) for $partitionLogId" Command.user.warn(warning) logger.warn(warning) } @@ -282,39 +273,28 @@ object AccumuloBulkCopyCommand extends LazyLogging { to.tableOps.addSplits(toTable, splits) } - val dirs = Collections.newSetFromMap(new ConcurrentHashMap[String, java.lang.Boolean]()) - val copyToDir = new Path(tableExportPath, "files") - if (params.distCp) { - new DistributedCopy(from.conf).copy(distcpPath, copyToDir, TerminalCallback()) match { - case JobSuccess(message, counts) => - dirs.add(copyToDir.toString) - Command.user.info(message) - logger.debug(s"Distributed copy counters: ${counts.mkString("\n ", "\n ", "")}") - - case JobFailure(message) => - Command.user.error(message) - logger.error(message) - } - } else { + val hadCopyError = new AtomicBoolean(false) + // read the distcp.txt file produced by the table export + // consumer: (src, dest) => Unit + def distCpConsumer(threads: Int)(consumer: (Path, Path) => Unit): Unit = { logger.debug(s"Reading $distcpPath") WithClose(IOUtils.lineIterator(exportFs.open(distcpPath), StandardCharsets.UTF_8)) { files => - CachedThreadPool.executor(params.fileThreads) { executor => + CachedThreadPool.executor(threads) { executor => files.asScala.foreach { file => val runnable: Runnable = () => { val path = new Path(file) val copy = new Path(copyToDir, path.getName) try { - logger.debug(s"Copying $path to $copy") - val fs = path.getFileSystem(from.conf) - if (FileUtil.copy(fs, path, exportFs, copy, false, false, to.conf)) { - dirs.add(copyToDir.toString) + if (params.resume && exportFs.exists(copy) && + path.getFileSystem(from.conf).getFileStatus(path).getLen == exportFs.getFileStatus(copy).getLen) { + logger.debug(s"Using existing copy of $path at $copy") } else { - Command.user.error(s"Failed to copy $path to $copy") - logger.error(s"Failed to copy $path to $copy") + consumer(path, copy) } } catch { // catch Throwable so NoClassDefFound still gets logged case e: Throwable => + hadCopyError.set(true) Command.user.error(s"Failed to copy $path to $copy") logger.error(s"Failed to copy $path to $copy", e) } @@ -325,24 +305,60 @@ object AccumuloBulkCopyCommand extends LazyLogging { } } - dirs.asScala.toSeq.sorted.foreach { dir => - logger.debug(s"Loading rfiles from $dir to $toTable") - val importDir = to.tableOps.importDirectory(dir).to(toTable) - try { importDir.ignoreEmptyDir(true) } catch { - case _: NoSuchMethodError => // accumulo 2.0, ignore + if (params.distCp) { + var inputPath = distcpPath + if (params.resume) { + logger.debug(s"Checking copy status of files in $distcpPath") + inputPath = new Path(tableExportPath, "distcp-remaining.txt") + WithClose(exportFs.create(inputPath, true)) { out => + distCpConsumer(1) { (path, _) => + logger.debug(s"Adding $path to distcp") + out.writeUTF(s"$path\n") + } + } } - try { importDir.load() } catch { - case e: IllegalArgumentException => logger.trace("Error importing directory:", e) // should mean empty dir + new DistributedCopy(from.conf).copy(inputPath, copyToDir, TerminalCallback()) match { + case JobSuccess(message, counts) => + Command.user.info(message) + logger.debug(s"Distributed copy counters: ${counts.mkString("\n ", "\n ", "")}") + + case JobFailure(message) => + hadCopyError.set(true) + Command.user.error(message) + logger.error(message) + } + } else { + distCpConsumer(params.fileThreads) { (path, copy) => + logger.debug(s"Copying $path to $copy") + val fs = path.getFileSystem(from.conf) + if (!FileUtil.copy(fs, path, exportFs, copy, false, true, to.conf)) { + // consolidate error handling in the catch block + throw new IOException(s"Failed to copy $path to $copy, copy returned false") + } } } + if (hadCopyError.get) { + throw new RuntimeException("Error copying data files") + } + + logger.debug(s"Loading rfiles from $copyToDir to $toTable") + val importDir = to.tableOps.importDirectory(copyToDir.toString).to(toTable) + try { importDir.ignoreEmptyDir(true) } catch { + case _: NoSuchMethodError => // accumulo 2.0, ignore + } + try { importDir.load() } catch { + case e: IllegalArgumentException => logger.trace("Error importing directory:", e) // should mean empty dir + } + + // create marker indicating this copy was successful + logger.debug(s"Creating completion marker $completeMarker") + exportFs.create(completeMarker).close() // cleanup - logger.debug(s"Deleting clone table $cloneTable") - from.tableOps.delete(cloneTable) logger.debug(s"Deleting export path $tableExportPath") exportFs.delete(tableExportPath, true) - - Command.user.info(s"Bulk copy complete for partition $partition ${fromIndex.identifier}") + logger.debug(s"Deleting clone table $cloneTable") + from.tableOps.delete(cloneTable) } override def close(): Unit = { @@ -398,4 +414,78 @@ object AccumuloBulkCopyCommand extends LazyLogging { override def close(): Unit = ds.dispose() } + + @Parameters(commandDescription = "Bulk copy RFiles to a different cluster") + class AccumuloBulkCopyParams extends RequiredTypeNameParam { + + @Parameter(names = Array("--from-instance"), description = "Source Accumulo instance name", required = true) + var fromInstance: String = _ + @Parameter(names = Array("--from-zookeepers"), description = "Zookeepers for the source instance (host[:port], comma separated)", required = true) + var fromZookeepers: String = _ + @Parameter(names = Array("--from-user"), description = "User name for the source instance", required = true) + var fromUser: String = _ + @Parameter(names = Array("--from-keytab"), description = "Path to Kerberos keytab file for the source instance") + var fromKeytab: String = _ + @Parameter(names = Array("--from-password"), description = "Connection password for the source instance") + var fromPassword: String = _ + @Parameter(names = Array("--from-catalog"), description = "Catalog table containing the source feature type", required = true) + var fromCatalog: String = _ + @Parameter(names = Array("--from-config"), description = "Additional Hadoop configuration file(s) to use for the source instance") + var fromConfigs: java.util.List[File] = _ + + @Parameter(names = Array("--to-instance"), description = "Destination Accumulo instance name", required = true) + var toInstance: String = _ + @Parameter(names = Array("--to-zookeepers"), description = "Zookeepers for the destination instance (host[:port], comma separated)", required = true) + var toZookeepers: String = _ + @Parameter(names = Array("--to-user"), description = "User name for the destination instance", required = true) + var toUser: String = _ + @Parameter(names = Array("--to-keytab"), description = "Path to Kerberos keytab file for the destination instance") + var toKeytab: String = _ + @Parameter(names = Array("--to-password"), description = "Connection password for the destination instance") + var toPassword: String = _ + @Parameter(names = Array("--to-catalog"), description = "Catalog table containing the destination feature type", required = true) + var toCatalog: String = _ + @Parameter(names = Array("--to-config"), description = "Additional Hadoop configuration file(s) to use for the destination instance") + var toConfigs: java.util.List[File] = _ + + @Parameter( + names = Array("--export-path"), + description = "HDFS path to use for source table export - the scheme and authority (e.g. bucket name) must match the destination table filesystem", + required = true) + var exportPath: String = _ + + @Parameter(names = Array("--partition"), description = "Partition(s) to copy (if schema is partitioned)") + var partitions: java.util.List[String] = _ + + @Parameter( + names = Array("--partition-value"), + description = "Value(s) (e.g. dates) used to indicate partitions to copy (if schema is partitioned)") + var partitionValues: java.util.List[String] = _ + + @Parameter(names = Array("--index"), description = "Specific index(es) to copy, instead of all indices") + var indices: java.util.List[String] = _ + + @Parameter( + names = Array("-t", "--threads"), + description = "Number of index tables to copy concurrently", + validateWith = Array(classOf[PositiveInteger])) + var tableThreads: java.lang.Integer = 1 + + @Parameter( + names = Array("--file-threads"), + description = "Number of files to copy concurrently, per table", + validateWith = Array(classOf[PositiveInteger])) + var fileThreads: java.lang.Integer = 2 + + @Parameter( + names = Array("--distcp"), + description = "Use Hadoop DistCp to move files from one cluster to the other, instead of normal file copies") + var distCp: Boolean = false + + @Parameter( + names = Array("--resume"), + description = "Resume a previously interrupted run from where it left off") + var resume: Boolean = false + } + } diff --git a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/api/GeoMesaFeatureIndex.scala b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/api/GeoMesaFeatureIndex.scala index de652e6ad085..73276cdb56a5 100644 --- a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/api/GeoMesaFeatureIndex.scala +++ b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/api/GeoMesaFeatureIndex.scala @@ -155,7 +155,23 @@ abstract class GeoMesaFeatureIndex[T, U](val ds: GeoMesaDataStore[_], } /** - * Gets the table name for this index + * Gets the single table name for this index. If this is a partitioned index, then the partition argument + * must be defined. A runtime exception will be thrown if multiple or zero tables are found. + * + * @param partition get the name for a particular partition, if the index is partitioned + * @return + */ + def getTableName(partition: Option[String] = None): String = { + val names = getTableNames(partition) + if (names.lengthCompare(1) == 0) { names.head } else { + val list = if (names.isEmpty) { "" } else { names.mkString(": ", ", ", "") } + throw new RuntimeException( + s"Expected 1 table but found ${names.length} for index $identifier${partition.fold("")(p => s" partition $p")}$list") + } + } + + /** + * Gets table names for this index * * @param partition get the name for a particular partition, or all partitions * @return diff --git a/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/geotools/SimpleFeatureTypes.scala b/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/geotools/SimpleFeatureTypes.scala index ce0a8c22de70..88f80ced2e20 100644 --- a/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/geotools/SimpleFeatureTypes.scala +++ b/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/geotools/SimpleFeatureTypes.scala @@ -72,7 +72,7 @@ object SimpleFeatureTypes { val ENABLED_INDEX_OPTS: Seq[String] = Seq(EnabledIndices, "geomesa.indexes.enabled", "table.indexes.enabled") } - private [geomesa] object InternalConfigs { + private[geomesa] object InternalConfigs { val GeomesaPrefix = "geomesa." val TableSharingPrefix = "geomesa.table.sharing.prefix" val UserDataPrefix = "geomesa.user-data.prefix" @@ -96,12 +96,38 @@ object SimpleFeatureTypes { val OptStats = "keep-stats" } - private [geomesa] object AttributeConfigs { + private[geomesa] object AttributeConfigs { val UserDataListType = "subtype" val UserDataMapKeyType = "keyclass" val UserDataMapValueType = "valueclass" } + private val IndexChecks: Seq[SimpleFeatureType => Any] = + Seq( + _.getIndices, + _.getDtgField, + _.isUuidEncoded, + _.getAttributeShards, + _.getIdShards, + _.getZ3Shards, + _.getZ2Shards, + _.getVisibilityLevel, + _.getXZPrecision, + _.getZ3Interval, + _.getS3Interval, + _.getCompression, + _.isPartitioned, + _.getTableSharingPrefix, + ) + + private val AttributeChecks: Seq[AttributeDescriptor => Any] = + Seq( + _.getUserData.get(AttributeOptions.OptIndexValue), + _.getUserData.get(AttributeOptions.OptJson), + _.getUserData.get(AttributeOptions.OptPrecision), + _.getUserData.get(AttributeOptions.OptColumnGroups), + ) + private val cache = new ConcurrentHashMap[(String, String), ImmutableSimpleFeatureType]() /** @@ -465,6 +491,27 @@ object SimpleFeatureTypes { } } + /** + * Compares that index tables are the same format (key and value) between the two feature types + * + * @param a first type + * @param b second type + * @return 0 if indices are the same, non-zero otherwise + */ + def compareIndexConfigs(a: SimpleFeatureType, b: SimpleFeatureType): Int = { + def compareAttributes(i: Int): Boolean = { + val (ad, bd) = (a.getDescriptor(i), b.getDescriptor(i)) + ad.getType.getBinding == bd.getType.getBinding && AttributeChecks.forall(c => c(ad) == c(bd)) + } + if (a.getAttributeCount == b.getAttributeCount && + IndexChecks.forall(c => c(a) == c(b)) && + Range(0, a.getAttributeCount).forall(compareAttributes)) { + 0 + } else { + 1 + } + } + private def createFeatureType(namespace: String, name: String, spec: SimpleFeatureSpec,