@@ -25,6 +25,8 @@ import org.apache.spark.sql.SQLContext
25
25
import za .co .absa .cobrix .cobol .internal .Logging
26
26
import za .co .absa .cobrix .cobol .reader .common .Constants
27
27
import za .co .absa .cobrix .cobol .reader .index .entry .SparseIndexEntry
28
+ import za .co .absa .cobrix .cobol .reader .stream .SimpleStream
29
+ import za .co .absa .cobrix .cobol .reader .{VarLenNestedReader => ReaderVarLenNestedReader }
28
30
import za .co .absa .cobrix .spark .cobol .reader .{Reader , VarLenReader }
29
31
import za .co .absa .cobrix .spark .cobol .source .SerializableConfiguration
30
32
import za .co .absa .cobrix .spark .cobol .source .parameters .LocalityParameters
@@ -111,10 +113,15 @@ private[source] object IndexBuilder extends Logging {
111
113
private [cobol] def buildIndexForVarLenReader (filesList : Array [FileWithOrder ],
112
114
reader : VarLenReader ,
113
115
sqlContext : SQLContext ): RDD [SparseIndexEntry ] = {
114
- val filesRDD = sqlContext.sparkContext.parallelize(filesList, filesList.length)
115
116
val conf = sqlContext.sparkContext.hadoopConfiguration
116
117
val sconf = new SerializableConfiguration (conf)
117
118
119
+ if (reader.getReaderProperties.enableSelfChecks && filesList.nonEmpty) {
120
+ selfCheckForIndexCompatibility(reader, filesList.head.filePath, conf)
121
+ }
122
+
123
+ val filesRDD = sqlContext.sparkContext.parallelize(filesList, filesList.length)
124
+
118
125
val indexRDD = filesRDD.mapPartitions(
119
126
partition => {
120
127
partition.flatMap(row => {
@@ -149,36 +156,114 @@ private[source] object IndexBuilder extends Logging {
149
156
config : Configuration ,
150
157
reader : VarLenReader ): ArrayBuffer [SparseIndexEntry ] = {
151
158
val filePath = fileWithOrder.filePath
152
- val path = new Path (filePath)
153
159
val fileOrder = fileWithOrder.order
160
+ val startOffset = reader.getReaderProperties.fileStartOffset
161
+ val endOffset = reader.getReaderProperties.fileEndOffset
162
+
163
+ logger.info(s " Going to generate index for the file: $filePath" )
164
+
165
+ val (inputStream, headerStream, maximumBytes) = getStreams(filePath, startOffset, endOffset, config)
166
+ val index = reader.generateIndex(inputStream, headerStream, fileOrder, reader.isRdwBigEndian)
167
+
168
+ val indexWithEndOffset = if (maximumBytes > 0 ){
169
+ index.map(entry => if (entry.offsetTo == - 1 ) entry.copy(offsetTo = startOffset + maximumBytes) else entry)
170
+ } else {
171
+ index
172
+ }
173
+
174
+ indexWithEndOffset
175
+ }
176
+
177
+ private [cobol] def getStreams (filePath : String ,
178
+ fileStartOffset : Long ,
179
+ fileEndOffset : Long ,
180
+ config : Configuration ,
181
+ ): (SimpleStream , SimpleStream , Long ) = {
182
+ val path = new Path (filePath)
154
183
val fileSystem = path.getFileSystem(config)
155
184
156
- val startOffset = reader.getReaderProperties. fileStartOffset
157
- val maximumBytes = if (reader.getReaderProperties. fileEndOffset == 0 ) {
185
+ val startOffset = fileStartOffset
186
+ val maximumBytes = if (fileEndOffset == 0 ) {
158
187
0
159
188
} else {
160
- val bytesToRead = fileSystem.getContentSummary(path).getLength - reader.getReaderProperties. fileEndOffset - startOffset
189
+ val bytesToRead = fileSystem.getContentSummary(path).getLength - fileEndOffset - startOffset
161
190
if (bytesToRead < 0 )
162
191
0
163
192
else
164
193
bytesToRead
165
194
}
166
195
167
- logger.info(s " Going to generate index for the file: $filePath" )
168
196
val inputStream = new FileStreamer (filePath, fileSystem, startOffset, maximumBytes)
169
197
val headerStream = new FileStreamer (filePath, fileSystem)
170
- val index = reader.generateIndex(inputStream, headerStream,
171
- fileOrder, reader.isRdwBigEndian)
172
198
173
- val indexWithEndOffset = if (maximumBytes > 0 ){
174
- index.map(entry => if (entry.offsetTo == - 1 ) entry.copy(offsetTo = startOffset + maximumBytes) else entry)
175
- } else {
176
- index
177
- }
178
-
179
- indexWithEndOffset
199
+ (inputStream, headerStream, maximumBytes)
180
200
}
181
201
202
+ private [cobol] def selfCheckForIndexCompatibility (reader : VarLenReader , filePath : String , config : Configuration ): Unit = {
203
+ if (! reader.isInstanceOf [ReaderVarLenNestedReader [_]])
204
+ return
205
+
206
+ val readerProperties = reader.getReaderProperties
207
+
208
+ val startOffset = readerProperties.fileStartOffset
209
+ val endOffset = readerProperties.fileEndOffset
210
+
211
+ readerProperties.recordExtractor.foreach { recordExtractorClass =>
212
+ val (dataStream, headerStream, _) = getStreams(filePath, startOffset, endOffset, config)
213
+
214
+ val extractorOpt = reader.asInstanceOf [ReaderVarLenNestedReader [_]].recordExtractor(0 , dataStream, headerStream)
215
+
216
+ var offset = - 1L
217
+ var record = Array [Byte ]()
218
+
219
+ extractorOpt.foreach { extractor =>
220
+ if (extractor.hasNext) {
221
+ // Getting the first record, if available
222
+ extractor.next()
223
+ offset = extractor.offset // Saving offset to jump to later
224
+
225
+ if (extractor.hasNext) {
226
+ // Getting the second record, if available
227
+ record = extractor.next() // Saving the record to check later
228
+
229
+ dataStream.close()
230
+ headerStream.close()
231
+
232
+ // Getting new streams and record extractor that points directly to the second record
233
+ val (dataStream2, headerStream2, _) = getStreams(filePath, offset, endOffset, config)
234
+ val extractorOpt2 = reader.asInstanceOf [ReaderVarLenNestedReader [_]].recordExtractor(1 , dataStream2, headerStream2)
235
+
236
+ extractorOpt2.foreach { extractor2 =>
237
+ if (! extractor2.hasNext) {
238
+ // If the extractor refuses to return the second record, it is obviously faulty in terms of indexing support.
239
+ throw new RuntimeException (
240
+ s " Record extractor self-check failed. When reading from a non-zero offset the extractor returned hasNext()=false. " +
241
+ " Please, use 'enable_indexes = false'. " +
242
+ s " File: $filePath, offset: $offset"
243
+ )
244
+ }
245
+
246
+ // Getting the second record from the extractor pointing to the second record offset at the start.
247
+ val expectedRecord = extractor2.next()
248
+
249
+ if (! expectedRecord.sameElements(record)) {
250
+ // Records should match. If they don't, the record extractor is faulty in terms of indexing support..
251
+ throw new RuntimeException (
252
+ s " Record extractor self-check failed. The record extractor returned wrong record when started from non-zero offset. " +
253
+ " Please, use 'enable_indexes = false'. " +
254
+ s " File: $filePath, offset: $offset"
255
+ )
256
+ } else {
257
+ logger.info(s " Record extractor self-check passed. File: $filePath, offset: $offset" )
258
+ }
259
+ dataStream2.close()
260
+ headerStream2.close()
261
+ }
262
+ }
263
+ }
264
+ }
265
+ }
266
+ }
182
267
183
268
private [cobol] def getBlockLengthByIndexEntry (entry : SparseIndexEntry ): Long = {
184
269
val indexedLength = if (entry.offsetTo - entry.offsetFrom > 0 )
0 commit comments