Skip to content

Commit 5b5cdd6

Browse files
committed
#759 Add tests for self-checks.
1 parent f417d42 commit 5b5cdd6

File tree

3 files changed

+183
-0
lines changed

3 files changed

+183
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package za.co.absa.cobrix.spark.cobol.mocks
2+
3+
import za.co.absa.cobrix.cobol.reader.extractors.raw.{RawRecordContext, RawRecordExtractor}
4+
5+
/**
6+
* This record extractor assumes each record has the size of 2 bytes.
7+
*/
8+
class FixedRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRecordExtractor {
9+
ctx.headerStream.close()
10+
11+
private var recordNumber = ctx.startingRecordNumber
12+
13+
override def offset: Long = ctx.inputStream.offset
14+
15+
override def hasNext: Boolean = !ctx.inputStream.isEndOfStream
16+
17+
@throws[NoSuchElementException]
18+
override def next(): Array[Byte] = {
19+
if (!hasNext) {
20+
throw new NoSuchElementException
21+
}
22+
23+
val rawRecord = ctx.inputStream.next(2)
24+
25+
recordNumber += 1
26+
27+
rawRecord
28+
}
29+
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package za.co.absa.cobrix.spark.cobol.mocks
2+
3+
import za.co.absa.cobrix.cobol.reader.extractors.raw.{RawRecordContext, RawRecordExtractor}
4+
5+
/**
6+
* This record extractor assumes each record has the size of 2 bytes.
7+
*
8+
* This record extractor is not index compatible.
9+
*/
10+
class FixedRecordExtractorNoIndex (ctx: RawRecordContext) extends Serializable with RawRecordExtractor {
11+
ctx.headerStream.close()
12+
13+
private var currentOffset = ctx.inputStream.offset
14+
private var recordNumber = ctx.startingRecordNumber
15+
16+
private var currentRecord = fetchRecord()
17+
18+
// This record extractor does not support indexes because it returns offsets not pointing to the next record.
19+
// Since the record is fetched eagerly, it returns the offset of the next record.
20+
override def offset: Long = currentOffset
21+
22+
override def hasNext: Boolean = currentRecord.nonEmpty
23+
24+
@throws[NoSuchElementException]
25+
override def next(): Array[Byte] = {
26+
if (!hasNext) {
27+
throw new NoSuchElementException
28+
}
29+
30+
val rawRecord = currentRecord.get
31+
32+
// In order to support indexes the next 2 lines should be reversed.
33+
currentRecord = fetchRecord()
34+
currentOffset = ctx.inputStream.offset
35+
36+
recordNumber += 1
37+
38+
rawRecord
39+
}
40+
41+
def fetchRecord(): Option[Array[Byte]] = {
42+
if (ctx.inputStream.isEndOfStream) {
43+
None
44+
} else {
45+
Option(ctx.inputStream.next(2))
46+
}
47+
}
48+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.cobrix.spark.cobol.source.integration
18+
19+
import org.apache.spark.sql.DataFrame
20+
import org.scalatest.wordspec.AnyWordSpec
21+
import za.co.absa.cobrix.spark.cobol.mocks.CustomRecordExtractorMock
22+
import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase
23+
import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture
24+
25+
class Test39RecordExtractorSelfCheck extends AnyWordSpec with SparkTestBase with BinaryFileFixture {
26+
private val copybook =
27+
""" 01 R.
28+
03 A PIC X(2).
29+
"""
30+
private val data = "AABBCCDDEEFF"
31+
32+
"Record extractor supporting indexes" should {
33+
"should work with indexes" in {
34+
val expected = """[{"A":"AA"},{"A":"BB"},{"A":"CC"},{"A":"DD"},{"A":"EE"},{"A":"FF"}]"""
35+
36+
withTempBinFile("custom_re", ".dat", data.getBytes) { tmpFileName =>
37+
val df = getDataFrame(tmpFileName, Map(
38+
"record_extractor" -> "za.co.absa.cobrix.spark.cobol.mocks.FixedRecordExtractor",
39+
"input_split_records" -> "2")
40+
)
41+
42+
val actual = df.toJSON.collect().mkString("[", ",", "]")
43+
44+
assert(actual == expected)
45+
}
46+
}
47+
}
48+
49+
"Record extractor not supporting indexes" should {
50+
"should fail self checks" ignore /* Not implemented yet */ {
51+
withTempBinFile("custom_re", ".dat", data.getBytes) { tmpFileName =>
52+
val df = getDataFrame(tmpFileName, Map(
53+
"record_extractor" -> "za.co.absa.cobrix.spark.cobol.mocks.FixedRecordExtractorNoIndex",
54+
"input_split_records" -> "2")
55+
)
56+
57+
val ex = intercept[RuntimeException] {
58+
df.show(false)
59+
df.count()
60+
}
61+
62+
assert(ex.getMessage == "")
63+
}
64+
}
65+
66+
"should still work if self checks is turned off" in {
67+
withTempBinFile("custom_re", ".dat", data.getBytes) { tmpFileName =>
68+
val df = getDataFrame(tmpFileName, Map(
69+
"enable_self_checks" -> "false",
70+
"record_extractor" -> "za.co.absa.cobrix.spark.cobol.mocks.FixedRecordExtractorNoIndex",
71+
"input_split_records" -> "2")
72+
)
73+
74+
// No guarantees regarding the correct record count at this point
75+
assert(df.count() > 4)
76+
}
77+
}
78+
79+
"should still work if indexes are disabled" in {
80+
val expected = """[{"A":"AA"},{"A":"BB"},{"A":"CC"},{"A":"DD"},{"A":"EE"},{"A":"FF"}]"""
81+
82+
withTempBinFile("custom_re", ".dat", data.getBytes) { tmpFileName =>
83+
val df = getDataFrame(tmpFileName, Map(
84+
"record_extractor" -> "za.co.absa.cobrix.spark.cobol.mocks.FixedRecordExtractorNoIndex",
85+
"enable_indexes" -> "false")
86+
)
87+
88+
val actual = df.toJSON.collect().mkString("[", ",", "]")
89+
90+
assert(actual == expected)
91+
}
92+
}
93+
}
94+
95+
private def getDataFrame(inputPath: String, extraOptions: Map[String, String] = Map.empty[String, String]): DataFrame = {
96+
spark
97+
.read
98+
.format("cobol")
99+
.option("copybook_contents", copybook)
100+
.option("encoding", "ascii")
101+
.options(extraOptions)
102+
.load(inputPath)
103+
}
104+
105+
}

0 commit comments

Comments
 (0)