Skip to content

Commit be3971d

Browse files
authored
GEOMESA-3535 FSDS - Split out io operations (#3443)
* New modules `geomesa-fs-storage-parquet-io` and `geomesa-fs-storage-orc-io`
1 parent 9a1e059 commit be3971d

File tree

79 files changed

+937
-496
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

79 files changed

+937
-496
lines changed

.github/workflows/build-and-test.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@ permissions: # added using https://github.com/step-security/secure-repo
1515
env:
1616
JAVA_VERSION: 17
1717
MAVEN_CLI_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false --batch-mode -Dlicense.skip=true
18-
MAVEN_COMPILE_ARGS: clean install -Dmaven.test.skip -Dmaven.assembly.skip=true -Dmaven.source.skip -Pskip-spark-runtimes -T2
18+
MAVEN_COMPILE_ARGS: clean install -Dmaven.test.skip -Dmaven.assembly.skip=true -Dmaven.source.skip -Pskip-spark-runtimes -T4
1919
MAVEN_SPARK_ARGS: clean install -Dmaven.test.skip -Dmaven.assembly.skip=true -Dmaven.source.skip -T2
2020
MAVEN_TEST_ARGS: test -Dtest.fork.count=1
2121
MAVEN_IT_ARGS: test failsafe:integration-test failsafe:verify -DskipTests -Dtest.fork.count=1
2222
MAVEN_ASSEMBLY_ARGS: assembly:single@make-assembly -Dassembly.ignoreMissingDescriptor=true -T2
2323
MAVEN_SCALADOC_ARGS: generate-sources scala:doc-jar
24-
MAVEN_JAVADOC_ARGS: generate-sources javadoc:jar
24+
MAVEN_JAVADOC_ARGS: generate-sources javadoc:jar -Psite
2525
MAVEN_DASH_ARGS: org.eclipse.dash:license-tool-plugin:license-check -Ddash.fail=true -Ddash.projectId=locationtech.geomesa -DexcludeGroupIds=org.locationtech.geomesa
2626

2727
jobs:
@@ -90,7 +90,7 @@ jobs:
9090
- name: features
9191
selector: -f geomesa-features
9292
- name: fs
93-
selector: -f geomesa-fs -pl '!geomesa-fs-storage,!geomesa-fs-storage/geomesa-fs-storage-api,!geomesa-fs-storage/geomesa-fs-storage-common,!geomesa-fs-storage/geomesa-fs-storage-convert,!geomesa-fs-storage/geomesa-fs-storage-orc,!geomesa-fs-storage/geomesa-fs-storage-parquet'
93+
selector: -f geomesa-fs -pl '!geomesa-fs-storage,!geomesa-fs-storage/geomesa-fs-storage-api,!geomesa-fs-storage/geomesa-fs-storage-common,!geomesa-fs-storage/geomesa-fs-storage-convert,!geomesa-fs-storage/geomesa-fs-storage-orc,!geomesa-fs-storage/geomesa-fs-storage-orc-io,!geomesa-fs-storage/geomesa-fs-storage-parquet,!geomesa-fs-storage/geomesa-fs-storage-parquet-io'
9494
- name: fs-storage
9595
selector: -f geomesa-fs/geomesa-fs-storage
9696
- name: gt-pg15

docs/_static/css/theme_custom.css

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,11 @@ a:hover {
6262
color: #6f91b8;
6363
}
6464

65+
details summary {
66+
cursor: pointer;
67+
padding-bottom: 20px;
68+
}
69+
6570
.rst-content tt.literal,.rst-content code.literal {
6671
color: #2a6ebb
6772
}

docs/user/filesystem/index_config.rst

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -202,31 +202,40 @@ Configuring Custom Observer Callbacks
202202

203203
The FSDS provides a mechanism to add custom handling during file writing. Users can implement observer factories,
204204
which will be invoked for each new file that is created. Observer factories must extend the trait
205-
``FileSystemObserverFactory``:
205+
``org.locationtech.geomesa.fs.storage.api.observer.FileSystemObserverFactory``:
206206

207207
.. code-block:: scala
208208
209-
package org.locationtech.geomesa.fs.storage.common.observer
209+
package org.locationtech.geomesa.fs.storage.api.observer
210210
211-
trait FileSystemObserverFactory extends Closeable {
211+
import org.apache.hadoop.conf.Configuration
212+
import org.apache.hadoop.fs.Path
213+
import org.geotools.api.feature.simple.SimpleFeatureType
212214
213-
/**
214-
* Called once after instantiating the factory
215-
*
216-
* @param conf hadoop configuration
217-
* @param root root path
218-
* @param sft simple feature type
219-
*/
220-
def init(conf: Configuration, root: Path, sft: SimpleFeatureType): Unit
215+
import java.io.Closeable
221216
222217
/**
223-
* Create an observer for the given path
224-
*
225-
* @param path file path being written
226-
* @return
218+
* Factory for observing file writes
227219
*/
228-
def apply(path: Path): FileSystemObserver
229-
}
220+
trait FileSystemObserverFactory extends Closeable {
221+
222+
/**
223+
* Called once after instantiating the factory
224+
*
225+
* @param conf hadoop configuration
226+
* @param root root path
227+
* @param sft simple feature type
228+
*/
229+
def init(conf: Configuration, root: Path, sft: SimpleFeatureType): Unit
230+
231+
/**
232+
* Create an observer for the given path
233+
*
234+
* @param path file path being written
235+
* @return
236+
*/
237+
def apply(path: Path): FileSystemObserver
238+
}
230239
231240
.. note::
232241

docs/user/upgrade.rst

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,46 @@ Deprecated Classes
128128

129129
* ``org.locationtech.geomesa.fs.data.FileSystemDataStoreFactory.FileSystemDataStoreParams`` - replaced with
130130
``org.locationtech.geomesa.fs.data.FileSystemDataStoreParams``
131+
* ``org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFactory`` - replaced with
132+
``org.locationtech.geomesa.fs.storage.api.observer.FileSystemObserverFactory``
133+
134+
Internal API Changes
135+
--------------------
136+
137+
GeoMesa does not have a well-defined public API. This section details classes and methods that have been changed or removed,
138+
but are not meant to be publicly available.
139+
140+
.. raw:: html
141+
142+
<details>
143+
<summary><b>Click here to see all API changes</b></summary>
144+
145+
Relocated Classes
146+
^^^^^^^^^^^^^^^^^
147+
148+
* ``org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.FileSystemPathReader`` ->
149+
``org.locationtech.geomesa.fs.storage.api.FileSystemStorage.FileSystemPathReader``
150+
* ``org.locationtech.geomesa.fs.storage.orc.OrcFileSystemReader`` -> ``org.locationtech.geomesa.fs.storage.orc.io``
151+
* ``org.locationtech.geomesa.fs.storage.orc.OrcFileSystemStorage`` type description methods moved to ->
152+
``org.locationtech.geomesa.fs.storage.orc.io.SimpleFeatureTypeDescription``
153+
* ``org.locationtech.geomesa.fs.storage.orc.OrcFileSystemWriter`` -> ``org.locationtech.geomesa.fs.storage.orc.io``
154+
* ``org.locationtech.geomesa.fs.storage.orc.utils.OrcAttributeWriter`` -> ``org.locationtech.geomesa.fs.storage.orc.io``
155+
* ``org.locationtech.geomesa.fs.storage.orc.utils.OrcSearchArguments`` -> ``org.locationtech.geomesa.fs.storage.orc.io``
156+
* ``org.locationtech.geomesa.fs.storage.parquet.ParquetFileSystemStorage.ParquetFileSystemWriter`` ->
157+
``org.locationtech.geomesa.fs.storage.parquet.io``
158+
* ``org.locationtech.geomesa.fs.storage.parquet.ParquetPathReader`` ->
159+
``org.locationtech.geomesa.fs.storage.parquet.io.ParquetFileSystemReader``
160+
* ``org.locationtech.geomesa.fs.storage.parquet.SimpleFeatureParquetWriter`` -> ``org.locationtech.geomesa.fs.storage.parquet.io``
161+
* ``org.locationtech.geomesa.utils.metrics.MetricsTags`` -> ``org.locationtech.geomesa.metrics.micrometer.utils.TagUtils``
162+
163+
Removed Classes
164+
^^^^^^^^^^^^^^^
165+
166+
* ``org.locationtech.geomesa.utils.metrics.LatencyMetrics``
167+
168+
.. raw:: html
169+
170+
</details>
131171

132172
Version 5.4.0 Upgrade Guide
133173
+++++++++++++++++++++++++++

geomesa-accumulo/geomesa-accumulo-tools/src/test/scala/org/locationtech/geomesa/accumulo/tools/export/AccumuloExportCommandTest.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,8 @@ import org.locationtech.geomesa.convert.text.DelimitedTextConverter
2828
import org.locationtech.geomesa.convert2.SimpleFeatureConverter
2929
import org.locationtech.geomesa.features.ScalaSimpleFeature
3030
import org.locationtech.geomesa.features.avro.io.AvroDataFileReader
31-
import org.locationtech.geomesa.fs.storage.common.jobs.StorageConfiguration
32-
import org.locationtech.geomesa.fs.storage.orc.OrcFileSystemReader
33-
import org.locationtech.geomesa.fs.storage.parquet.ParquetPathReader
31+
import org.locationtech.geomesa.fs.storage.orc.io.OrcFileSystemReader
32+
import org.locationtech.geomesa.fs.storage.parquet.io.{ParquetFileSystemReader, SimpleFeatureParquetSchema}
3433
import org.locationtech.geomesa.tools.`export`.ExportFormat
3534
import org.locationtech.geomesa.utils.bin.BinaryOutputEncoder
3635
import org.locationtech.geomesa.utils.collection.SelfClosingIterator
@@ -284,8 +283,8 @@ class AccumuloExportCommandTest extends TestWithDataStore {
284283
def readParquet(file: String, sft: SimpleFeatureType): Seq[SimpleFeature] = {
285284
val path = new Path(PathUtils.getUrl(file).toURI)
286285
val conf = new Configuration()
287-
StorageConfiguration.setSft(conf, sft)
288-
WithClose(new ParquetPathReader(conf, sft, FilterCompat.NOOP, None, _ => true, None).read(path)) { iter =>
286+
SimpleFeatureParquetSchema.setSft(conf, sft)
287+
WithClose(new ParquetFileSystemReader(conf, sft, FilterCompat.NOOP, None, _ => true, None).read(path)) { iter =>
289288
iter.map(ScalaSimpleFeature.copy).toList
290289
}
291290
}

geomesa-convert/geomesa-convert-common/src/main/scala/org/locationtech/geomesa/convert2/AbstractCompositeConverter.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ import org.locationtech.geomesa.convert.{EnrichmentCache, EvaluationContext}
1818
import org.locationtech.geomesa.convert2.AbstractCompositeConverter.{CompositeEvaluationContext, PredicateContext}
1919
import org.locationtech.geomesa.convert2.metrics.ConverterMetrics
2020
import org.locationtech.geomesa.convert2.transforms.Predicate
21+
import org.locationtech.geomesa.metrics.micrometer.utils.TagUtils
2122
import org.locationtech.geomesa.utils.collection.CloseableIterator
2223
import org.locationtech.geomesa.utils.io.CloseWithLogging
23-
import org.locationtech.geomesa.utils.metrics.MetricsTags
2424

2525
import java.io.InputStream
2626
import java.time.Duration
@@ -33,7 +33,7 @@ abstract class AbstractCompositeConverter[T <: AnyRef](
3333
delegates: Seq[(Predicate, ParsingConverter[T])]
3434
) extends SimpleFeatureConverter with LazyLogging {
3535

36-
private val tags = MetricsTags.typeNameTag(sft)
36+
private val tags = TagUtils.typeNameTag(sft.getTypeName)
3737

3838
private val routingTimer =
3939
Timer.builder(ConverterMetrics.name("predicate.eval"))

geomesa-convert/geomesa-convert-common/src/main/scala/org/locationtech/geomesa/convert2/AbstractConverter.scala

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,24 +11,25 @@ package org.locationtech.geomesa.convert2
1111
import com.codahale.metrics.Counter
1212
import com.typesafe.config.Config
1313
import com.typesafe.scalalogging.LazyLogging
14-
import io.micrometer.core.instrument.{Metrics, Timer}
14+
import io.micrometer.core.instrument.{DistributionSummary, Metrics, Tag, Timer}
1515
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
1616
import org.geotools.util.factory.Hints
1717
import org.locationtech.geomesa.convert.EvaluationContext.{EvaluationError, StatefulEvaluationContext, Stats}
1818
import org.locationtech.geomesa.convert.Modes.{ErrorMode, ParseMode}
1919
import org.locationtech.geomesa.convert.{EnrichmentCache, EvaluationContext}
20-
import org.locationtech.geomesa.convert2.AbstractConverter.{BasicField, addDependencies, topologicalOrder}
20+
import org.locationtech.geomesa.convert2.AbstractConverter.{BasicField, LatencyMetrics, addDependencies, topologicalOrder}
2121
import org.locationtech.geomesa.convert2.metrics.ConverterMetrics
2222
import org.locationtech.geomesa.convert2.transforms.Expression
2323
import org.locationtech.geomesa.convert2.validators.{IdValidatorFactory, SimpleFeatureValidator}
2424
import org.locationtech.geomesa.features.ScalaSimpleFeature
25+
import org.locationtech.geomesa.metrics.micrometer.utils.{GaugeUtils, TagUtils}
2526
import org.locationtech.geomesa.utils.collection.CloseableIterator
2627
import org.locationtech.geomesa.utils.io.CloseWithLogging
27-
import org.locationtech.geomesa.utils.metrics.{LatencyMetrics, MetricsTags}
2828

2929
import java.io.{IOException, InputStream}
3030
import java.nio.charset.{Charset, StandardCharsets}
3131
import java.time.Duration
32+
import java.util.Date
3233
import java.util.concurrent.TimeUnit
3334
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
3435

@@ -134,8 +135,8 @@ abstract class AbstractConverter[T, C <: ConverterConfig, F <: Field, O <: Conve
134135
private val metrics = ConverterMetrics(sft, options.reporters)
135136

136137
private val tags = config match {
137-
case ConverterName(name) => MetricsTags.typeNameTag(sft).and(ConverterMetrics.converterNameTag(name))
138-
case _ => MetricsTags.typeNameTag(sft)
138+
case ConverterName(name) => TagUtils.typeNameTag(sft.getTypeName).and(ConverterMetrics.converterNameTag(name))
139+
case _ => TagUtils.typeNameTag(sft.getTypeName)
139140
}
140141

141142
private val validators =
@@ -159,7 +160,7 @@ abstract class AbstractConverter[T, C <: ConverterConfig, F <: Field, O <: Conve
159160
.maximumExpectedValue(Duration.ofMillis(2))
160161
.register(Metrics.globalRegistry)
161162

162-
private val latency = sft.getDtgIndex.map(i => new LatencyMetrics(i, ConverterMetrics.MetricsNamePrefix.get, tags))
163+
private val latency = sft.getDtgIndex.map(i => new LatencyMetrics(i, tags))
163164

164165
override def targetSft: SimpleFeatureType = sft
165166

@@ -355,6 +356,39 @@ object AbstractConverter {
355356
}
356357
}
357358

359+
/**
360+
* Latency metrics tracker
361+
*
362+
* @param dtgIndex index of the date attribute to track in the feature type
363+
* @param tags metrics tags
364+
*/
365+
private class LatencyMetrics(dtgIndex: Int, tags: java.lang.Iterable[Tag]) {
366+
367+
private val date = GaugeUtils.timeGauge(ConverterMetrics.name("dtg.latest"), tags)
368+
369+
private val latency =
370+
DistributionSummary.builder(ConverterMetrics.name("dtg.latency"))
371+
.tags(tags)
372+
.publishPercentileHistogram()
373+
.baseUnit("milliseconds")
374+
.minimumExpectedValue(1d)
375+
.maximumExpectedValue(Duration.ofDays(1).toMillis.toDouble)
376+
.register(Metrics.globalRegistry)
377+
378+
/**
379+
* Record latency for a feature
380+
*
381+
* @param feature feature
382+
*/
383+
def apply(feature: SimpleFeature): Unit = {
384+
val dtg = feature.getAttribute(dtgIndex).asInstanceOf[Date]
385+
if (dtg != null) {
386+
date.set(dtg.getTime)
387+
latency.record(System.currentTimeMillis() - dtg.getTime)
388+
}
389+
}
390+
}
391+
358392
/**
359393
* Add the dependencies of a field to a graph
360394
*

geomesa-convert/geomesa-convert-common/src/main/scala/org/locationtech/geomesa/convert2/composite/CompositeConverter.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ import org.locationtech.geomesa.convert.EvaluationContext.Stats
1616
import org.locationtech.geomesa.convert2.AbstractCompositeConverter.{CompositeEvaluationContext, PredicateContext}
1717
import org.locationtech.geomesa.convert2.SimpleFeatureConverter
1818
import org.locationtech.geomesa.convert2.transforms.Predicate
19+
import org.locationtech.geomesa.metrics.micrometer.utils.TagUtils
1920
import org.locationtech.geomesa.utils.collection.CloseableIterator
2021
import org.locationtech.geomesa.utils.io.CloseWithLogging
21-
import org.locationtech.geomesa.utils.metrics.MetricsTags
2222

2323
import java.io.{ByteArrayInputStream, InputStream}
2424
import java.nio.charset.StandardCharsets
@@ -28,7 +28,7 @@ import scala.util.Try
2828
class CompositeConverter(val targetSft: SimpleFeatureType, delegates: Seq[(Predicate, SimpleFeatureConverter)])
2929
extends SimpleFeatureConverter {
3030

31-
private val tags = MetricsTags.typeNameTag(targetSft)
31+
private val tags = TagUtils.typeNameTag(targetSft.getTypeName)
3232

3333
override def createEvaluationContext(globalParams: Map[String, Any]): EvaluationContext =
3434
createEvaluationContext(delegates.map(_._2.createEvaluationContext(globalParams)), Stats(tags))

geomesa-convert/geomesa-convert-parquet/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
</dependency>
2323
<dependency>
2424
<groupId>org.locationtech.geomesa</groupId>
25-
<artifactId>geomesa-fs-storage-parquet_${scala.binary.version}</artifactId>
25+
<artifactId>geomesa-fs-storage-parquet-io_${scala.binary.version}</artifactId>
2626
</dependency>
2727
<dependency>
2828
<groupId>org.apache.parquet</groupId>

geomesa-convert/geomesa-convert-parquet/src/main/scala/org/locationtech/geomesa/convert/parquet/ParquetConverterFactory.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import org.locationtech.geomesa.convert2.{AbstractConverterFactory, TypeInferenc
2929
import org.locationtech.geomesa.fs.storage.parquet.io.GeoParquetMetadata.{ColumnMetadata, GeoParquetColumnEncoding, GeoParquetColumnType}
3030
import org.locationtech.geomesa.fs.storage.parquet.io.GeometrySchema.GeometryEncoding.GeoParquetNative
3131
import org.locationtech.geomesa.fs.storage.parquet.io.{GeoParquetMetadata, GeometrySchema, SimpleFeatureParquetSchema}
32-
import org.locationtech.geomesa.security.SecurityUtils
3332
import org.locationtech.geomesa.utils.geotools.ObjectType
3433
import org.locationtech.geomesa.utils.geotools.ObjectType.ObjectType
3534
import org.locationtech.geomesa.utils.io.PathUtils
@@ -81,7 +80,7 @@ class ParquetConverterFactory
8180
val id = Expression(s"avroPath($$0, '/${SimpleFeatureParquetSchema.FeatureIdField}')")
8281
val userData =
8382
if (parquet.hasVisibilities) {
84-
Map(SecurityUtils.FEATURE_VISIBILITY -> Expression(s"avroPath($$0, '/${SimpleFeatureParquetSchema.VisibilitiesField}')"))
83+
Map("geomesa.feature.visibility" -> Expression(s"avroPath($$0, '/${SimpleFeatureParquetSchema.VisibilitiesField}')"))
8584
} else {
8685
Map.empty[String, Expression]
8786
}

0 commit comments

Comments
 (0)