Skip to content

Commit 444a9fd

Browse files
committed
GEOMESA-3411 FSDS - Fix path cache registration order (#3249)
1 parent 41e12b3 commit 444a9fd

File tree

7 files changed

+55
-41
lines changed

7 files changed

+55
-41
lines changed

geomesa-features/geomesa-feature-exporters/src/main/scala/org/locationtech/geomesa/features/exporters/FileSystemExporter.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ import com.typesafe.scalalogging.LazyLogging
1212
import org.apache.hadoop.conf.Configuration
1313
import org.apache.hadoop.fs.Path
1414
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
15+
import org.locationtech.geomesa.fs.storage.api.FileSystemContext
1516
import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.FileSystemWriter
16-
import org.locationtech.geomesa.fs.storage.common.jobs.StorageConfiguration
1717
import org.locationtech.geomesa.fs.storage.orc.OrcFileSystemWriter
1818
import org.locationtech.geomesa.fs.storage.parquet.ParquetFileSystemStorage.ParquetFileSystemWriter
1919
import org.locationtech.geomesa.utils.io.PathUtils
@@ -52,22 +52,23 @@ object FileSystemExporter extends LazyLogging {
5252

5353
class ParquetFileSystemExporter(path: String) extends FileSystemExporter {
5454
override protected def createWriter(sft: SimpleFeatureType): FileSystemWriter = {
55+
// use PathUtils.getUrl to handle local files, otherwise default can be in hdfs
56+
val file = new Path(PathUtils.getUrl(path).toURI)
5557
val conf = new Configuration()
56-
StorageConfiguration.setSft(conf, sft)
5758
try { Class.forName("org.xerial.snappy.Snappy") } catch {
5859
case _: ClassNotFoundException =>
5960
logger.warn("SNAPPY compression is not available on the classpath - falling back to GZIP")
6061
conf.set("parquet.compression", "GZIP")
6162
}
62-
// use PathUtils.getUrl to handle local files, otherwise default can be in hdfs
63-
new ParquetFileSystemWriter(sft, new Path(PathUtils.getUrl(path).toURI), conf)
63+
new ParquetFileSystemWriter(sft, FileSystemContext(file, conf), file)
6464
}
6565
}
6666

6767
class OrcFileSystemExporter(path: String) extends FileSystemExporter {
6868
override protected def createWriter(sft: SimpleFeatureType): FileSystemWriter = {
6969
// use PathUtils.getUrl to handle local files, otherwise default can be in hdfs
70-
new OrcFileSystemWriter(sft, new Configuration(), new Path(PathUtils.getUrl(path).toURI))
70+
val file = new Path(PathUtils.getUrl(path).toURI)
71+
new OrcFileSystemWriter(sft, FileSystemContext(file, new Configuration()), file)
7172
}
7273
}
7374
}

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/AbstractFileSystemStorage.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,6 @@ abstract class AbstractFileSystemStorage(
233233

234234
def pathAndObserver: WriterConfig = {
235235
val path = StorageUtils.nextFile(context.root, partition, metadata.leafStorage, extension, fileType)
236-
PathCache.register(context.fs, path)
237236
val updateObserver = new UpdateObserver(partition, path, action)
238237
val observer = if (observers.isEmpty) { updateObserver } else {
239238
new CompositeObserver(observers.map(_.apply(path)).+:(updateObserver))

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/utils/PathCache.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ object PathCache {
5050
)
5151

5252
/**
53-
* * Register a path as existing
53+
* Register a path as existing
5454
*
5555
* @param fs file system
5656
* @param path path

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/main/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemStorage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class OrcFileSystemStorage(context: FileSystemContext, metadata: StorageMetadata
3333
extends AbstractFileSystemStorage(context, metadata, OrcFileSystemStorage.FileExtension) {
3434

3535
override protected def createWriter(file: Path, observer: FileSystemObserver): FileSystemWriter =
36-
new OrcFileSystemWriter(metadata.sft, context.conf, file, observer)
36+
new OrcFileSystemWriter(metadata.sft, context, file, observer)
3737

3838
override protected def createReader(
3939
filter: Option[Filter],

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/main/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemWriter.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,28 +8,29 @@
88

99
package org.locationtech.geomesa.fs.storage.orc
1010

11-
import org.apache.hadoop.conf.Configuration
1211
import org.apache.hadoop.fs.Path
1312
import org.apache.orc.OrcFile
1413
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
14+
import org.locationtech.geomesa.fs.storage.api.FileSystemContext
1515
import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.FileSystemWriter
1616
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserver
1717
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFactory.NoOpObserver
18+
import org.locationtech.geomesa.fs.storage.common.utils.PathCache
1819
import org.locationtech.geomesa.fs.storage.orc.utils.OrcAttributeWriter
1920
import org.locationtech.geomesa.utils.io.CloseQuietly
2021

2122
import scala.util.control.NonFatal
2223

2324
class OrcFileSystemWriter(
2425
sft: SimpleFeatureType,
25-
config: Configuration,
26+
context: FileSystemContext,
2627
file: Path,
2728
observer: FileSystemObserver = NoOpObserver
2829
) extends FileSystemWriter {
2930

3031
private val schema = OrcFileSystemStorage.createTypeDescription(sft)
3132

32-
private val options = OrcFile.writerOptions(config).setSchema(schema)
33+
private val options = OrcFile.writerOptions(context.conf).setSchema(schema)
3334
private val writer = OrcFile.createWriter(file, options)
3435
private val batch = schema.createRowBatch()
3536

@@ -56,6 +57,7 @@ class OrcFileSystemWriter(
5657
case NonFatal(e) => CloseQuietly(Seq(writer, observer)).foreach(e.addSuppressed); throw e
5758
}
5859
CloseQuietly.raise(Seq(writer, observer))
60+
PathCache.register(context.fs, file)
5961
}
6062

6163
private def flushBatch(): Unit = {

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/test/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemWriterTest.scala

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import org.apache.hadoop.conf.Configuration
1313
import org.apache.hadoop.fs.Path
1414
import org.junit.runner.RunWith
1515
import org.locationtech.geomesa.features.ScalaSimpleFeature
16+
import org.locationtech.geomesa.fs.storage.api.FileSystemContext
1617
import org.locationtech.geomesa.utils.collection.SelfClosingIterator
1718
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes
1819
import org.locationtech.geomesa.utils.io.WithClose
@@ -32,26 +33,22 @@ class OrcFileSystemWriterTest extends Specification {
3233
ScalaSimpleFeature.create(sft, "1", "name1", "1", "2017-01-01T00:00:01.000Z", "LINESTRING (10 1, 5 1)")
3334
)
3435

35-
val config = new Configuration()
36-
3736
"OrcFileSystemWriter" should {
3837
"write and read simple features" in {
39-
40-
withPath { path =>
41-
withTestFile { file =>
42-
WithClose(new OrcFileSystemWriter(sft, config, file)) { writer => features.foreach(writer.write) }
43-
val reader = new OrcFileSystemReader(sft, config, None, None)
44-
val read = WithClose(reader.read(file)) { i => SelfClosingIterator(i).map(ScalaSimpleFeature.copy).toList }
45-
read mustEqual features
46-
// test out not calling 'hasNext'
47-
var i = 0
48-
WithClose(reader.read(file)) { iter =>
49-
while (i < features.size) {
50-
iter.next() mustEqual features(i)
51-
i += 1
52-
}
53-
iter.next must throwA[NoSuchElementException]
38+
withTestFile { file =>
39+
val fc = FileSystemContext(file.getParent, new Configuration())
40+
WithClose(new OrcFileSystemWriter(sft, fc, file)) { writer => features.foreach(writer.write) }
41+
val reader = new OrcFileSystemReader(sft, fc.conf, None, None)
42+
val read = WithClose(reader.read(file)) { i => SelfClosingIterator(i).map(ScalaSimpleFeature.copy).toList }
43+
read mustEqual features
44+
// test out not calling 'hasNext'
45+
var i = 0
46+
WithClose(reader.read(file)) { iter =>
47+
while (i < features.size) {
48+
iter.next() mustEqual features(i)
49+
i += 1
5450
}
51+
iter.next must throwA[NoSuchElementException]
5552
}
5653
}
5754
}

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/ParquetFileSystemStorage.scala

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import org.apache.parquet.hadoop.ParquetReader
1313
import org.apache.parquet.hadoop.example.GroupReadSupport
1414
import org.apache.hadoop.conf.Configuration
1515
import org.apache.hadoop.fs.Path
16+
import org.apache.parquet.example.data.Group
1617
import org.apache.parquet.filter2.compat.FilterCompat
1718
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
1819
import org.geotools.api.filter.Filter
@@ -24,9 +25,12 @@ import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.File
2425
import org.locationtech.geomesa.fs.storage.common.jobs.StorageConfiguration
2526
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserver
2627
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFactory.NoOpObserver
28+
import org.locationtech.geomesa.fs.storage.common.utils.PathCache
2729
import org.locationtech.geomesa.fs.storage.parquet.ParquetFileSystemStorage.ParquetFileSystemWriter
2830
import org.locationtech.geomesa.utils.io.CloseQuietly
2931

32+
import scala.util.control.NonFatal
33+
3034
/**
3135
*
3236
* @param context file system context
@@ -35,11 +39,8 @@ import org.locationtech.geomesa.utils.io.CloseQuietly
3539
class ParquetFileSystemStorage(context: FileSystemContext, metadata: StorageMetadata)
3640
extends AbstractFileSystemStorage(context, metadata, ParquetFileSystemStorage.FileExtension) {
3741

38-
override protected def createWriter(file: Path, observer: FileSystemObserver): FileSystemWriter = {
39-
val sftConf = new Configuration(context.conf)
40-
StorageConfiguration.setSft(sftConf, metadata.sft)
41-
new ParquetFileSystemWriter(metadata.sft, file, sftConf, observer)
42-
}
42+
override protected def createWriter(file: Path, observer: FileSystemObserver): FileSystemWriter =
43+
new ParquetFileSystemWriter(metadata.sft, context, file, observer)
4344

4445
override protected def createReader(
4546
filter: Option[Filter],
@@ -72,11 +73,17 @@ object ParquetFileSystemStorage extends LazyLogging {
7273

7374
class ParquetFileSystemWriter(
7475
sft: SimpleFeatureType,
76+
context: FileSystemContext,
7577
file: Path,
76-
conf: Configuration,
7778
observer: FileSystemObserver = NoOpObserver
7879
) extends FileSystemWriter {
7980

81+
private val conf = {
82+
val conf = new Configuration(context.conf)
83+
StorageConfiguration.setSft(conf, sft)
84+
conf
85+
}
86+
8087
private val writer = SimpleFeatureParquetWriter.builder(file, conf).build()
8188

8289
override def write(f: SimpleFeature): Unit = {
@@ -86,27 +93,35 @@ object ParquetFileSystemStorage extends LazyLogging {
8693
override def flush(): Unit = observer.flush()
8794
override def close(): Unit = {
8895
CloseQuietly(Seq(writer, observer)).foreach(e => throw e)
89-
if (FileValidationEnabled.get.toBoolean) {
96+
PathCache.register(context.fs, file)
97+
if (FileValidationEnabled.toBoolean.get) {
9098
validateParquetFile(file)
9199
}
92100
}
93101
}
94102

103+
/**
104+
* Validate a file by reading it back
105+
*
106+
* @param file file to validate
107+
*/
95108
def validateParquetFile(file: Path): Unit = {
96-
val reader = ParquetReader.builder(new GroupReadSupport(), file).build()
97-
109+
var reader: ParquetReader[Group] = null
98110
try {
99-
// Read Parquet file content
111+
// read Parquet file content
112+
reader = ParquetReader.builder(new GroupReadSupport(), file).build()
100113
var record = reader.read()
101114
while (record != null) {
102115
// Process the record
103116
record = reader.read()
104117
}
105-
logger.debug(s"${file} is a valid Parquet file")
118+
logger.trace(s"$file is a valid Parquet file")
106119
} catch {
107-
case e: Exception => throw new RuntimeException(s"Unable to validate ${file}: File may be corrupted", e)
120+
case NonFatal(e) => throw new RuntimeException(s"Unable to validate $file: File may be corrupted", e)
108121
} finally {
109-
reader.close()
122+
if (reader != null) {
123+
reader.close()
124+
}
110125
}
111126
}
112127
}

0 commit comments

Comments
 (0)