Skip to content

Commit 5f97774

Browse files
committed
GEOMESA-3443 Accumulo - Fix remote processing flags for join indices (#3270)
1 parent e0be030 commit 5f97774

File tree

1 file changed

+47
-71
lines changed

1 file changed

+47
-71
lines changed

geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloJoinIndexAdapter.scala

Lines changed: 47 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -73,24 +73,33 @@ object AccumuloJoinIndexAdapter {
7373

7474
// TODO seems like this should be using 'schema' here, which may be a reduced version of the indexSft due to col groups
7575
val indexSft = index.indexSft
76-
lazy val sort = hints.getSortFields
77-
lazy val max = hints.getMaxFeatures
78-
lazy val project = hints.getProjection
76+
val transform = hints.getTransformSchema
7977

8078
// for queries that don't require a join, creates a regular batch scan plan
8179
def plan(
8280
iters: Seq[IteratorSetting],
8381
kvsToFeatures: ResultsToFeatures[Entry[Key, Value]],
84-
reduce: Option[FeatureReducer]): BatchScanPlan =
85-
BatchScanPlan(filter, tables, ranges, iters, colFamily, kvsToFeatures, reduce, sort, max, project, numThreads)
86-
87-
val transform = hints.getTransformSchema
82+
reduce: Option[FeatureReducer]): BatchScanPlan = {
83+
// add the attribute-level vis iterator if necessary
84+
val iterators = visibilityIter(index) ++ iters
85+
val sort = hints.getSortFields
86+
val max = hints.getMaxFeatures
87+
val project = hints.getProjection
88+
BatchScanPlan(filter, tables, ranges, iterators, colFamily, kvsToFeatures, reduce, sort, max, project, numThreads)
89+
}
8890

8991
// used when remote processing is disabled
90-
lazy val returnSchema = hints.getTransformSchema.getOrElse(indexSft)
91-
lazy val fti = visibilityIter(index) ++ FilterTransformIterator.configure(indexSft, index, ecql, hints).toSeq
92-
lazy val resultsToFeatures = AccumuloResultsToFeatures(index, returnSchema)
93-
lazy val localReducer = Some(new LocalTransformReducer(returnSchema, None, None, None, hints))
92+
def localPlan(overrides: Option[Seq[IteratorSetting]] = None): BatchScanPlan = {
93+
val returnSchema = transform.getOrElse(indexSft)
94+
if (hints.isSkipReduce) {
95+
// override the return sft to reflect what we're actually returning,
96+
// since the bin sft is only created in the local reduce step
97+
hints.put(QueryHints.Internal.RETURN_SFT, returnSchema)
98+
}
99+
val iters = overrides.getOrElse(FilterTransformIterator.configure(indexSft, index, ecql, hints).toSeq)
100+
val localReducer = Some(new LocalTransformReducer(returnSchema, None, None, None, hints))
101+
plan(iters, AccumuloResultsToFeatures(index, returnSchema), localReducer)
102+
}
94103

95104
val qp = if (hints.isBinQuery) {
96105
// check to see if we can execute against the index values
@@ -99,16 +108,10 @@ object AccumuloJoinIndexAdapter {
99108
hints.getBinLabelField.forall(indexSft.indexOf(_) != -1) &&
100109
index.supportsFilter(ecql)) {
101110
if (ds.config.remote.bin) {
102-
val iter = BinAggregatingIterator.configure(indexSft, index, ecql, hints)
103-
val iters = visibilityIter(index) :+ iter
104-
plan(iters, new AccumuloBinResultsToFeatures(), None)
111+
val binIter = BinAggregatingIterator.configure(indexSft, index, ecql, hints)
112+
plan(Seq(binIter), new AccumuloBinResultsToFeatures(), None)
105113
} else {
106-
if (hints.isSkipReduce) {
107-
// override the return sft to reflect what we're actually returning,
108-
// since the bin sft is only created in the local reduce step
109-
hints.hints.put(QueryHints.Internal.RETURN_SFT, returnSchema)
110-
}
111-
plan(fti, resultsToFeatures, localReducer)
114+
localPlan()
112115
}
113116
} else {
114117
// have to do a join against the record table
@@ -117,40 +120,30 @@ object AccumuloJoinIndexAdapter {
117120
} else if (hints.isArrowQuery) {
118121
// check to see if we can execute against the index values
119122
if (index.canUseIndexSchema(ecql, transform)) {
120-
if (ds.config.remote.bin) {
121-
val (iter, reduce) = ArrowIterator.configure(indexSft, index, ds.stats, filter.filter, ecql, hints)
122-
val iters = visibilityIter(index) :+ iter
123-
plan(iters, new AccumuloArrowResultsToFeatures(), Some(reduce))
123+
if (ds.config.remote.arrow) {
124+
val (arrowIter, reduce) = ArrowIterator.configure(indexSft, index, ds.stats, filter.filter, ecql, hints)
125+
plan(Seq(arrowIter), new AccumuloArrowResultsToFeatures(), Some(reduce))
124126
} else {
125-
if (hints.isSkipReduce) {
126-
// override the return sft to reflect what we're actually returning,
127-
// since the arrow sft is only created in the local reduce step
128-
hints.hints.put(QueryHints.Internal.RETURN_SFT, returnSchema)
129-
}
130-
plan(fti, resultsToFeatures, localReducer)
127+
localPlan()
131128
}
132129
} else if (index.canUseIndexSchemaPlusKey(ecql, transform)) {
133130
val transformSft = transform.getOrElse {
134131
throw new IllegalStateException("Must have a transform for attribute key plus value scan")
135132
}
136133
// first filter and apply the transform
137134
val filterTransformIter = FilterTransformIterator.configure(indexSft, index, ecql, hints, 23).get
138-
// clear the transforms as we've already accounted for them
139-
hints.clearTransforms()
140135
// next add the attribute value from the row key
141136
val rowValueIter = AttributeKeyValueIterator.configure(index.asInstanceOf[AttributeIndex], transformSft, 24)
142-
if (ds.config.remote.bin) {
137+
if (ds.config.remote.arrow) {
138+
// clear the transforms as we've already accounted for them
139+
val newHints = new Hints(hints)
140+
newHints.clearTransforms()
143141
// finally apply the arrow iterator on the resulting features
144-
val (iter, reduce) = ArrowIterator.configure(transformSft, index, ds.stats, None, None, hints)
145-
val iters = visibilityIter(index) ++ Seq(filterTransformIter, rowValueIter, iter)
142+
val (arrowIter, reduce) = ArrowIterator.configure(transformSft, index, ds.stats, None, None, newHints)
143+
val iters = Seq(filterTransformIter, rowValueIter, arrowIter)
146144
plan(iters, new AccumuloArrowResultsToFeatures(), Some(reduce))
147145
} else {
148-
if (hints.isSkipReduce) {
149-
// override the return sft to reflect what we're actually returning,
150-
// since the arrow sft is only created in the local reduce step
151-
hints.hints.put(QueryHints.Internal.RETURN_SFT, returnSchema)
152-
}
153-
plan(fti, resultsToFeatures, localReducer)
146+
localPlan(Some(Seq(filterTransformIter, rowValueIter)))
154147
}
155148
} else {
156149
// have to do a join against the record table
@@ -160,8 +153,7 @@ object AccumuloJoinIndexAdapter {
160153
// check to see if we can execute against the index values
161154
val weightIsAttribute = hints.getDensityWeight.contains(index.attributes.head)
162155
if (index.supportsFilter(ecql) && (weightIsAttribute || hints.getDensityWeight.forall(indexSft.indexOf(_) != -1))) {
163-
if (ds.config.remote.bin) {
164-
val visIter = visibilityIter(index)
156+
if (ds.config.remote.density) {
165157
val iters = if (weightIsAttribute) {
166158
// create a transform sft with the attribute added
167159
val transform = {
@@ -181,18 +173,13 @@ object AccumuloJoinIndexAdapter {
181173
// priority needs to be between vis iter (21) and density iter (25)
182174
val keyValueIter = AttributeKeyValueIterator.configure(index.asInstanceOf[AttributeIndex], transform, 23)
183175
val densityIter = DensityIterator.configure(transform, index, ecql, hints)
184-
visIter :+ keyValueIter :+ densityIter
176+
Seq(keyValueIter, densityIter)
185177
} else {
186-
visIter :+ DensityIterator.configure(indexSft, index, ecql, hints)
178+
Seq(DensityIterator.configure(indexSft, index, ecql, hints))
187179
}
188180
plan(iters, new AccumuloDensityResultsToFeatures(), None)
189181
} else {
190-
if (hints.isSkipReduce) {
191-
// override the return sft to reflect what we're actually returning,
192-
// since the density sft is only created in the local reduce step
193-
hints.hints.put(QueryHints.Internal.RETURN_SFT, returnSchema)
194-
}
195-
plan(fti, resultsToFeatures, localReducer)
182+
localPlan()
196183
}
197184
} else {
198185
// have to do a join against the record table
@@ -201,18 +188,12 @@ object AccumuloJoinIndexAdapter {
201188
} else if (hints.isStatsQuery) {
202189
// check to see if we can execute against the index values
203190
if (Try(Stat(indexSft, hints.getStatsQuery)).isSuccess && index.supportsFilter(ecql)) {
204-
if (ds.config.remote.bin) {
205-
val iter = StatsIterator.configure(indexSft, index, ecql, hints)
206-
val iters = visibilityIter(index) :+ iter
191+
if (ds.config.remote.stats) {
192+
val statsIter = StatsIterator.configure(indexSft, index, ecql, hints)
207193
val reduce = Some(StatsScan.StatsReducer(indexSft, hints))
208-
plan(iters, new AccumuloStatsResultsToFeatures(), reduce)
194+
plan(Seq(statsIter), new AccumuloStatsResultsToFeatures(), reduce)
209195
} else {
210-
if (hints.isSkipReduce) {
211-
// override the return sft to reflect what we're actually returning,
212-
// since the stats sft is only created in the local reduce step
213-
hints.hints.put(QueryHints.Internal.RETURN_SFT, returnSchema)
214-
}
215-
plan(fti, resultsToFeatures, localReducer)
196+
localPlan()
216197
}
217198
} else {
218199
// have to do a join against the record table
@@ -225,22 +206,17 @@ object AccumuloJoinIndexAdapter {
225206
val transformSft = transform.getOrElse {
226207
throw new IllegalStateException("Must have a transform for attribute value scan")
227208
}
228-
val iter = FilterTransformIterator.configure(indexSft, index, ecql, hints.getTransform, hints.getSampling)
229-
// add the attribute-level vis iterator if necessary
230-
val iters = visibilityIter(index) ++ iter.toSeq
209+
val fti = FilterTransformIterator.configure(indexSft, index, ecql, hints.getTransform, hints.getSampling)
231210
// need to use transform to convert key/values
232211
val toFeatures = AccumuloResultsToFeatures(index, transformSft)
233-
plan(iters, toFeatures, None)
212+
plan(fti.toSeq, toFeatures, None)
234213
} else if (index.canUseIndexSchemaPlusKey(ecql, transform)) {
235214
// we can use the index PLUS the value
236215
val transformSft = transform.getOrElse {
237216
throw new IllegalStateException("Must have a transform for attribute key plus value scan")
238217
}
239-
val iter = FilterTransformIterator.configure(indexSft, index, ecql, hints.getTransform, hints.getSampling)
240-
// add the attribute-level vis iterator if necessary
241-
val iters =
242-
visibilityIter(index) ++ iter.toSeq :+
243-
AttributeKeyValueIterator.configure(index.asInstanceOf[AttributeIndex], transformSft)
218+
val fti = FilterTransformIterator.configure(indexSft, index, ecql, hints.getTransform, hints.getSampling)
219+
val iters = fti.toSeq :+ AttributeKeyValueIterator.configure(index.asInstanceOf[AttributeIndex], transformSft)
244220
// need to use transform to convert key/values
245221
val toFeatures = AccumuloResultsToFeatures(index, transformSft)
246222
plan(iters, toFeatures, None)
@@ -301,7 +277,7 @@ object AccumuloJoinIndexAdapter {
301277
if (hints.isSkipReduce) {
302278
// override the return sft to reflect what we're actually returning,
303279
// since the arrow sft is only created in the local reduce step
304-
hints.hints.put(QueryHints.Internal.RETURN_SFT, resultSft)
280+
hints.put(QueryHints.Internal.RETURN_SFT, resultSft)
305281
}
306282

307283
val recordTables = recordIndex.getTablesForQuery(filter.filter)

0 commit comments

Comments
 (0)