Skip to content

Commit c9c3986

Browse files
committed
#763 Implement relaxed type restrictions on which field can be used for record length.
1 parent 77dc2fd commit c9c3986

File tree

7 files changed

+161
-40
lines changed

7 files changed

+161
-40
lines changed

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/asttransform/DependencyMarker.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package za.co.absa.cobrix.cobol.parser.asttransform
1818

1919
import org.slf4j.LoggerFactory
2020
import za.co.absa.cobrix.cobol.parser.CopybookParser.CopybookAST
21-
import za.co.absa.cobrix.cobol.parser.ast.datatype.Integral
21+
import za.co.absa.cobrix.cobol.parser.ast.datatype.{Decimal, Integral}
2222
import za.co.absa.cobrix.cobol.parser.ast.{Group, Primitive, Statement}
2323

2424
import scala.collection.mutable
@@ -96,6 +96,7 @@ class DependencyMarker(
9696
val newPrimitive = if (dependees contains primitive) {
9797
primitive.dataType match {
9898
case _: Integral => true
99+
case d: Decimal if d.scale == 0 => true
99100
case dt =>
100101
for (stmt <- dependees(primitive)) {
101102
if (stmt.dependingOnHandlers.isEmpty)

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedWithRecordLengthExprRawRecordExtractor.scala

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import za.co.absa.cobrix.cobol.reader.iterator.RecordLengthExpression
2222
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
2323
import za.co.absa.cobrix.cobol.reader.validator.ReaderParametersValidator
2424

25+
import scala.util.Try
26+
2527
class FixedWithRecordLengthExprRawRecordExtractor(ctx: RawRecordContext,
2628
readerProperties: ReaderParameters) extends Serializable with RawRecordExtractor {
2729
private val log = LoggerFactory.getLogger(this.getClass)
@@ -121,19 +123,21 @@ class FixedWithRecordLengthExprRawRecordExtractor(ctx: RawRecordContext,
121123
final private def getRecordLengthFromField(lengthAST: Primitive, binaryDataStart: Array[Byte]): Int = {
122124
val length = if (isLengthMapEmpty) {
123125
ctx.copybook.extractPrimitiveField(lengthAST, binaryDataStart, readerProperties.startOffset) match {
124-
case i: Int => i
125-
case l: Long => l.toInt
126-
case s: String => s.toInt
127-
case null => throw new IllegalStateException(s"Null encountered as a record length field (offset: $byteIndex, raw value: ${getBytesAsHexString(binaryDataStart)}).")
128-
case _ => throw new IllegalStateException(s"Record length value of the field ${lengthAST.name} must be an integral type.")
126+
case i: Int => i
127+
case l: Long => l.toInt
128+
case s: String => Try{ s.toInt }.getOrElse(throw new IllegalStateException(s"Record length value of the field ${lengthAST.name} must be an integral type, encountered: '$s'."))
129+
case d: BigDecimal => d.toInt
130+
case null => throw new IllegalStateException(s"Null encountered as a record length field (offset: $byteIndex, raw value: ${getBytesAsHexString(binaryDataStart)}).")
131+
case _ => throw new IllegalStateException(s"Record length value of the field ${lengthAST.name} must be an integral type.")
129132
}
130133
} else {
131134
ctx.copybook.extractPrimitiveField(lengthAST, binaryDataStart, readerProperties.startOffset) match {
132-
case i: Int => getRecordLengthFromMapping(i.toString)
133-
case l: Long => getRecordLengthFromMapping(l.toString)
134-
case s: String => getRecordLengthFromMapping(s)
135-
case null => defaultRecordLength.getOrElse(throw new IllegalStateException(s"Null encountered as a record length field (offset: $byteIndex, raw value: ${getBytesAsHexString(binaryDataStart)})."))
136-
case _ => throw new IllegalStateException(s"Record length value of the field ${lengthAST.name} must be an integral type.")
135+
case i: Int => getRecordLengthFromMapping(i.toString)
136+
case l: Long => getRecordLengthFromMapping(l.toString)
137+
case d: BigDecimal => getRecordLengthFromMapping(d.toString())
138+
case s: String => getRecordLengthFromMapping(s)
139+
case null => defaultRecordLength.getOrElse(throw new IllegalStateException(s"Null encountered as a record length field (offset: $byteIndex, raw value: ${getBytesAsHexString(binaryDataStart)})."))
140+
case _ => throw new IllegalStateException(s"Record length value of the field ${lengthAST.name} must be an integral type.")
137141
}
138142
}
139143
length + recordLengthAdjustment

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/validator/ReaderParametersValidator.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package za.co.absa.cobrix.cobol.reader.validator
1818

19+
import org.slf4j.LoggerFactory
1920
import za.co.absa.cobrix.cobol.parser.Copybook
2021
import za.co.absa.cobrix.cobol.parser.ast.Primitive
2122
import za.co.absa.cobrix.cobol.parser.expression.NumberExprEvaluator
@@ -25,6 +26,7 @@ import za.co.absa.cobrix.cobol.reader.parameters.MultisegmentParameters
2526
import scala.util.Try
2627

2728
object ReaderParametersValidator {
29+
private val log = LoggerFactory.getLogger(this.getClass)
2830

2931
def getEitherFieldAndExpression(fieldOrExpressionOpt: Option[String], recordLengthMap: Map[String, Int], cobolSchema: Copybook): (Option[RecordLengthField], Option[RecordLengthExpression]) = {
3032
fieldOrExpressionOpt match {
@@ -49,7 +51,7 @@ object ReaderParametersValidator {
4951
val astNode = field match {
5052
case s: Primitive =>
5153
if (!s.dataType.isInstanceOf[za.co.absa.cobrix.cobol.parser.ast.datatype.Integral] && recordLengthMap.isEmpty) {
52-
throw new IllegalStateException(s"The record length field $recordLengthFieldName must be an integral type or a value mapping must be specified.")
54+
log.warn(s"The record length field $recordLengthFieldName is not integral. Runtime exceptions could occur if values can't be parsed as numbers.")
5355
}
5456
if (s.occurs.isDefined && s.occurs.get > 1) {
5557
throw new IllegalStateException(s"The record length field '$recordLengthFieldName' cannot be an array.")

cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReaderSpec.scala

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -200,32 +200,6 @@ class VRLRecordReaderSpec extends AnyWordSpec {
200200
assert(record2(14) == 0xF8.toByte)
201201
}
202202

203-
"throw an exception on a fraction type" in {
204-
val copybookWithFieldLength =
205-
""" 01 RECORD.
206-
05 LEN PIC 9(8)V99.
207-
05 N PIC 9(2).
208-
05 A PIC X(2).
209-
"""
210-
211-
val records = Array[Byte](0x00)
212-
val streamH = new ByteStreamMock(records)
213-
val streamD = new ByteStreamMock(records)
214-
val context = RawRecordContext(0, streamH, streamD, CopybookParser.parseSimple(copybookWithFieldLength), null, null, "")
215-
216-
val readerParameters = ReaderParameters(lengthFieldExpression = Some("LEN"))
217-
218-
val ex = intercept[IllegalStateException] {
219-
getUseCase(
220-
copybook = copybookWithFieldLength,
221-
records = records,
222-
lengthFieldExpression = Some("LEN"),
223-
recordExtractor = Some(new FixedWithRecordLengthExprRawRecordExtractor(context, readerParameters)))
224-
}
225-
226-
assert(ex.getMessage == "The record length field LEN must be an integral type or a value mapping must be specified.")
227-
}
228-
229203
"the length mapping with default record length" in {
230204
val copybookWithLenbgthMap =
231205
""" 01 RECORD.

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test37RecordLengthMappingSpec.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,27 @@ class Test37RecordLengthMappingSpec extends AnyWordSpec with SparkTestBase with
106106
}
107107
}
108108

109+
"work for numeric mappings and strict integrals" in {
110+
withTempBinFile("record_length_mapping", ".tmp", dataNumeric) { tempFile =>
111+
val expected = """{"SEG_ID":"1","TEXT":"123"},{"SEG_ID":"2","TEXT":"123456"},{"SEG_ID":"3","TEXT":"1234567"}"""
112+
113+
val df = spark.read
114+
.format("cobol")
115+
.option("copybook_contents", copybook)
116+
.option("record_format", "F")
117+
.option("record_length_field", "SEG-ID")
118+
.option("input_split_records", "2")
119+
.option("pedantic", "true")
120+
.option("record_length_map", """{"1":4,"2":7,"3":8}""")
121+
.option("strict_integral_precision", "true")
122+
.load(tempFile)
123+
124+
val actual = df.orderBy("SEG_ID").toJSON.collect().mkString(",")
125+
126+
assert(actual == expected)
127+
}
128+
}
129+
109130
"work for data with offsets" in {
110131
withTempBinFile("record_length_mapping", ".tmp", dataWithFileOffsets) { tempFile =>
111132
val expected = """{"SEG_ID":"A","TEXT":"123"},{"SEG_ID":"B","TEXT":"123456"},{"SEG_ID":"C","TEXT":"1234567"}"""

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test21VariableOccursForTextFiles.scala

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,5 +228,40 @@ class Test21VariableOccursForTextFiles extends AnyWordSpec with SparkTestBase wi
228228
assertEqualsMultiline(actualData, expectedData)
229229
}
230230
}
231+
232+
"correctly keep occurs for Cobrix ASCII with variable length extractor and decimal depending field" in {
233+
val expectedSchema =
234+
"""root
235+
| |-- COUNT: decimal(1,0) (nullable = true)
236+
| |-- GROUP: array (nullable = true)
237+
| | |-- element: struct (containsNull = true)
238+
| | | |-- INNER_COUNT: decimal(1,0) (nullable = true)
239+
| | | |-- INNER_GROUP: array (nullable = true)
240+
| | | | |-- element: struct (containsNull = true)
241+
| | | | | |-- FIELD: string (nullable = true)
242+
| |-- MARKER: string (nullable = true)
243+
|""".stripMargin
244+
245+
withTempTextFile("variable_occurs_ascii", ".dat", StandardCharsets.US_ASCII, data) { tmpFileName =>
246+
val df = spark
247+
.read
248+
.format("cobol")
249+
.option("copybook_contents", copybook)
250+
.option("record_format", "D")
251+
.option("ascii_charset", "utf8")
252+
.option("variable_size_occurs", "true")
253+
.option("strict_integral_precision", "true")
254+
.option("pedantic", "true")
255+
.load(tmpFileName)
256+
257+
val actualSchema = df.schema.treeString
258+
259+
assertEqualsMultiline(actualSchema, expectedSchema)
260+
261+
val actualData = SparkUtils.prettyJSON(df.toJSON.collect().mkString("[", ",", "]"))
262+
263+
assertEqualsMultiline(actualData, expectedData)
264+
}
265+
}
231266
}
232267
}

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/regression/Test26FixLengthWithIdGeneration.scala

Lines changed: 86 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package za.co.absa.cobrix.spark.cobol.source.regression
1818

19+
import org.apache.spark.SparkException
1920
import org.scalatest.wordspec.AnyWordSpec
2021
import org.slf4j.{Logger, LoggerFactory}
2122
import za.co.absa.cobrix.spark.cobol.source.base.{SimpleComparisonBase, SparkTestBase}
@@ -143,7 +144,7 @@ class Test26FixLengthWithIdGeneration extends AnyWordSpec with SparkTestBase wit
143144

144145
"EBCDIC files" should {
145146
"correctly work with segment id generation option with length field" in {
146-
withTempBinFile("fix_length_reg", ".dat", binFileContentsLengthField) { tmpFileName =>
147+
withTempBinFile("fix_length_reg1", ".dat", binFileContentsLengthField) { tmpFileName =>
147148
val df = spark
148149
.read
149150
.format("cobol")
@@ -168,7 +169,7 @@ class Test26FixLengthWithIdGeneration extends AnyWordSpec with SparkTestBase wit
168169
}
169170

170171
"correctly work with segment id generation option with length expression" in {
171-
withTempBinFile("fix_length_reg", ".dat", binFileContentsLengthExpr) { tmpFileName =>
172+
withTempBinFile("fix_length_reg2", ".dat", binFileContentsLengthExpr) { tmpFileName =>
172173
val df = spark
173174
.read
174175
.format("cobol")
@@ -192,4 +193,87 @@ class Test26FixLengthWithIdGeneration extends AnyWordSpec with SparkTestBase wit
192193
}
193194
}
194195
}
196+
197+
"correctly work with segment id generation option with length field" in {
198+
withTempBinFile("fix_length_reg3", ".dat", binFileContentsLengthField) { tmpFileName =>
199+
val df = spark
200+
.read
201+
.format("cobol")
202+
.option("copybook_contents", copybook)
203+
.option("record_format", "F")
204+
.option("record_length_field", "LEN")
205+
.option("strict_integral_precision", "true")
206+
.option("segment_field", "IND")
207+
.option("segment_id_prefix", "ID")
208+
.option("segment_id_level0", "A")
209+
.option("segment_id_level1", "_")
210+
.option("redefine-segment-id-map:0", "SEGMENT1 => A")
211+
.option("redefine-segment-id-map:1", "SEGMENT2 => B")
212+
.option("redefine-segment-id-map:2", "SEGMENT3 => C")
213+
.option("input_split_records", 1)
214+
.option("pedantic", "true")
215+
.load(tmpFileName)
216+
217+
val actual = SparkUtils.convertDataFrameToPrettyJSON(df.drop("LEN").orderBy("Seg_Id0", "Seg_Id1"))
218+
219+
assertEqualsMultiline(actual, expected)
220+
}
221+
}
222+
223+
"work with string values" in {
224+
val copybook =
225+
""" 01 R.
226+
05 LEN PIC X(1).
227+
05 FIELD1 PIC X(1).
228+
"""
229+
230+
val binFileContentsLengthField: Array[Byte] = Array[Byte](
231+
// A1
232+
0xF2.toByte, 0xF3.toByte, 0xF3.toByte, 0xF4.toByte
233+
).map(_.toByte)
234+
235+
withTempBinFile("fix_length_str", ".dat", binFileContentsLengthField) { tmpFileName =>
236+
val df = spark
237+
.read
238+
.format("cobol")
239+
.option("copybook_contents", copybook)
240+
.option("record_format", "F")
241+
.option("record_length_field", "LEN")
242+
.option("pedantic", "true")
243+
.load(tmpFileName)
244+
245+
assert(df.count() == 2)
246+
}
247+
}
248+
249+
"fail for incorrect string values" in {
250+
val copybook =
251+
""" 01 R.
252+
05 LEN PIC X(1).
253+
05 FIELD1 PIC X(1).
254+
"""
255+
256+
val binFileContentsLengthField: Array[Byte] = Array[Byte](
257+
// A1
258+
0xF2.toByte, 0xF3.toByte, 0xC3.toByte, 0xF4.toByte
259+
).map(_.toByte)
260+
261+
withTempBinFile("fix_length_str", ".dat", binFileContentsLengthField) { tmpFileName =>
262+
val df = spark
263+
.read
264+
.format("cobol")
265+
.option("copybook_contents", copybook)
266+
.option("record_format", "F")
267+
.option("record_length_field", "LEN")
268+
.option("pedantic", "true")
269+
.load(tmpFileName)
270+
271+
val ex = intercept[SparkException] {
272+
df.count()
273+
}
274+
275+
assert(ex.getCause.getMessage.contains("Record length value of the field LEN must be an integral type, encountered: 'C'"))
276+
}
277+
}
278+
195279
}

0 commit comments

Comments
 (0)