88
99package org .locationtech .geomesa .accumulo .data
1010
11+ import com .github .benmanes .caffeine .cache .{CacheLoader , Caffeine }
1112import com .typesafe .scalalogging .LazyLogging
1213import org .apache .accumulo .core .data .{Key , Range , Value }
1314import org .apache .accumulo .core .file .keyfunctor .RowFunctor
1415import org .apache .hadoop .io .Text
1516import org .geotools .api .feature .simple .{SimpleFeature , SimpleFeatureType }
17+ import org .locationtech .geomesa .accumulo .AccumuloProperties .TableProperties .TableCacheExpiry
1618import org .locationtech .geomesa .accumulo .data .AccumuloIndexAdapter ._
1719import org .locationtech .geomesa .accumulo .data .AccumuloQueryPlan .{BatchScanPlan , EmptyPlan }
1820import org .locationtech .geomesa .accumulo .data .writer .tx .AccumuloAtomicIndexWriter
@@ -35,6 +37,7 @@ import org.locationtech.geomesa.index.index.z2.{Z2Index, Z2IndexValues}
3537import org .locationtech .geomesa .index .index .z3 .{Z3Index , Z3IndexValues }
3638import org .locationtech .geomesa .index .iterators .StatsScan
3739import org .locationtech .geomesa .index .planning .LocalQueryRunner .LocalTransformReducer
40+ import org .locationtech .geomesa .index .utils .Explainer
3841import org .locationtech .geomesa .security .SecurityUtils
3942import org .locationtech .geomesa .utils .concurrent .CachedThreadPool
4043import org .locationtech .geomesa .utils .geotools .SimpleFeatureTypes .{Configs , InternalConfigs }
@@ -58,6 +61,13 @@ class AccumuloIndexAdapter(ds: AccumuloDataStore)
5861
5962 private val tableOps = ds.connector.tableOperations()
6063
64+ private val tableSizeCache =
65+ Caffeine .newBuilder().expireAfterWrite(TableCacheExpiry .toJavaDuration.get).build[String , Integer ](
66+ new CacheLoader [String , Integer ]() {
67+ override def load (table : String ): Integer = tableOps.listSplits(table).size() + 1
68+ }
69+ )
70+
6171 // noinspection ScalaDeprecation
6272 override def createTable (
6373 index : GeoMesaFeatureIndex [_, _],
@@ -153,21 +163,12 @@ class AccumuloIndexAdapter(ds: AccumuloDataStore)
153163 override def createQueryPlan (strategy : QueryStrategy ): AccumuloQueryPlan = {
154164 import org .locationtech .geomesa .index .conf .QueryHints .RichHints
155165
156- val QueryStrategy (filter, byteRanges, _, _, ecql, hints, _) = strategy
157- val index = filter.index
158- // index api defines empty start/end for open-ended range - in accumulo, it's indicated with null
159- // index api defines start row inclusive, end row exclusive
160- val ranges = byteRanges.map {
161- case BoundedByteRange (start, end) =>
162- val startKey = if (start.length == 0 ) { null } else { new Key (new Text (start)) }
163- val endKey = if (end.length == 0 ) { null } else { new Key (new Text (end)) }
164- new Range (startKey, true , endKey, false )
165-
166- case SingleRowByteRange (row) =>
167- new Range (new Text (row))
168- }
166+ val index = strategy.index
167+ val ecql = strategy.ecql
168+ val hints = strategy.hints
169+ val ranges = strategy.ranges.map(toAccumuloRange)
169170 val numThreads = if (index.name == IdIndex .name) { ds.config.queries.recordThreads } else { ds.config.queries.threads }
170- val tables = index.getTablesForQuery(filter.filter)
171+ val tables = index.getTablesForQuery(strategy. filter.filter)
171172 val (colFamily, schema) = {
172173 val (cf, s) = groups.group(index.sft, hints.getTransformDefinition, ecql)
173174 (Some (new Text (ColumnFamilyMapper (index)(cf))), s)
@@ -180,10 +181,10 @@ class AccumuloIndexAdapter(ds: AccumuloDataStore)
180181
181182 index match {
182183 case i : AttributeJoinIndex =>
183- AccumuloJoinIndexAdapter .createQueryPlan(ds, i, filter , tables, ranges, colFamily, schema, ecql, hints, numThreads)
184+ AccumuloJoinIndexAdapter .createQueryPlan(ds, i, strategy , tables, ranges, colFamily, schema, ecql, hints, numThreads)
184185
185186 case _ =>
186- val (iter, eToF, reduce) = if (strategy. hints.isBinQuery) {
187+ val (iter, eToF, reduce) = if (hints.isBinQuery) {
187188 if (ds.config.remote.bin) {
188189 val iter = BinAggregatingIterator .configure(schema, index, ecql, hints)
189190 (Seq (iter), new AccumuloBinResultsToFeatures (), None )
@@ -195,9 +196,9 @@ class AccumuloIndexAdapter(ds: AccumuloDataStore)
195196 }
196197 (fti, resultsToFeatures, localReducer)
197198 }
198- } else if (strategy. hints.isArrowQuery) {
199+ } else if (hints.isArrowQuery) {
199200 if (ds.config.remote.arrow) {
200- val (iter, reduce) = ArrowIterator .configure(schema, index, ds.stats, filter.filter, ecql, hints)
201+ val (iter, reduce) = ArrowIterator .configure(schema, index, ds.stats, strategy. filter.filter, ecql, hints)
201202 (Seq (iter), new AccumuloArrowResultsToFeatures (), Some (reduce))
202203 } else {
203204 if (hints.isSkipReduce) {
@@ -207,7 +208,7 @@ class AccumuloIndexAdapter(ds: AccumuloDataStore)
207208 }
208209 (fti, resultsToFeatures, localReducer)
209210 }
210- } else if (strategy. hints.isDensityQuery) {
211+ } else if (hints.isDensityQuery) {
211212 if (ds.config.remote.density) {
212213 val iter = DensityIterator .configure(schema, index, ecql, hints)
213214 (Seq (iter), new AccumuloDensityResultsToFeatures (), None )
@@ -219,7 +220,7 @@ class AccumuloIndexAdapter(ds: AccumuloDataStore)
219220 }
220221 (fti, resultsToFeatures, localReducer)
221222 }
222- } else if (strategy. hints.isStatsQuery) {
223+ } else if (hints.isStatsQuery) {
223224 if (ds.config.remote.stats) {
224225 val iter = StatsIterator .configure(schema, index, ecql, hints)
225226 val reduce = Some (StatsScan .StatsReducer (schema, hints))
@@ -236,7 +237,7 @@ class AccumuloIndexAdapter(ds: AccumuloDataStore)
236237 (fti, resultsToFeatures, None )
237238 }
238239
239- if (ranges.isEmpty) { EmptyPlan (strategy.filter , reduce) } else {
240+ if (ranges.isEmpty) { EmptyPlan (strategy, reduce) } else {
240241 // configure additional iterators based on the index
241242 // TODO pull this out to be SPI loaded so that new indices can be added seamlessly
242243 val indexIter = if (index.name == Z3Index .name) {
@@ -272,7 +273,7 @@ class AccumuloIndexAdapter(ds: AccumuloDataStore)
272273 val max = hints.getMaxFeatures
273274 val project = hints.getProjection
274275
275- BatchScanPlan (filter , tables, ranges, iters, colFamily, eToF, reduce, sort, max, project, numThreads)
276+ BatchScanPlan (strategy , tables, ranges, iters, colFamily, eToF, reduce, sort, max, project, numThreads)
276277 }
277278 }
278279 }
@@ -290,12 +291,56 @@ class AccumuloIndexAdapter(ds: AccumuloDataStore)
290291 case (true , true ) => new AccumuloAtomicIndexWriter (ds, sft, indices, wrapper, partition) with RequiredVisibilityWriter
291292 }
292293 }
294+
295+ override def getStrategyCost (strategy : FilterStrategy , explain : Explainer ): Option [Long ] = {
296+ explain.pushLevel(s " Calculating cost for ${strategy.index.identifier}" )
297+ val start = System .currentTimeMillis()
298+ try {
299+ val tables = strategy.index.getTablesForQuery(strategy.filter)
300+ if (tables.isEmpty) {
301+ return Some (0L )
302+ }
303+ val ranges = strategy.getQueryStrategy(explain).ranges.map(toAccumuloRange).asJava
304+ val cost =
305+ tables.foldLeft(0d ) { case (sum, table) =>
306+ val numTablets = tableSizeCache.get(table)
307+ val tabletsScanned = tableOps.locate(table, ranges).groupByTablet().size()
308+ explain(s " Strategy hits $tabletsScanned/ $numTablets tablets for table $table" )
309+ val cost = 100 * (tabletsScanned.toDouble / numTablets)
310+ sum + cost
311+ }
312+ Some (cost.toLong)
313+ } finally {
314+ explain(s " Cost calculations took ${System .currentTimeMillis() - start}ms " ).popLevel()
315+ }
316+ }
293317}
294318
295319object AccumuloIndexAdapter {
296320
297321 val ZIterPriority = 23
298322
323+ /**
324+ * Converts a generic index-api range into an Accumulo range
325+ *
326+ * @param range range
327+ * @return
328+ */
329+ private def toAccumuloRange (range : ByteRange ): Range = range match {
330+ case BoundedByteRange (lower, upper) =>
331+ // index api defines empty start/end for open-ended range - in accumulo, it's indicated with null
332+ val start = if (lower.length == 0 ) { null } else { new Key (new Text (lower)) }
333+ val end = if (upper.length == 0 ) { null } else { new Key (new Text (upper)) }
334+ // index api defines start row inclusive, end row exclusive
335+ new Range (start, true , end, false )
336+
337+ case SingleRowByteRange (row) =>
338+ new Range (new Text (row))
339+
340+ case _ =>
341+ throw new IllegalArgumentException (s " Unexpected range type $range" )
342+ }
343+
299344 /**
300345 * Accumulo entries to features
301346 *
0 commit comments