@@ -20,7 +20,8 @@ import org.apache.spark.sql.functions._
20
20
import org .apache .spark .sql .types .{DecimalType , LongType , StringType }
21
21
import org .apache .spark .sql .{Column , DataFrame }
22
22
import za .co .absa .atum .agent .core .MeasurementProcessor
23
- import za .co .absa .atum .agent .core .MeasurementProcessor .MeasurementFunction
23
+ import za .co .absa .atum .agent .core .MeasurementProcessor .{MeasurementFunction , ResultOfMeasurement }
24
+ import za .co .absa .atum .model .dto .MeasureResultDTO .ResultValueType
24
25
import za .co .absa .spark .commons .implicits .StructTypeImplicits .StructTypeEnhancements
25
26
26
27
/**
@@ -32,86 +33,109 @@ sealed trait Measure extends MeasurementProcessor with MeasureType {
32
33
33
34
trait MeasureType {
34
35
val measureName : String
35
- val onlyForNumeric : Boolean
36
+ val resultValueType : ResultValueType . ResultValueType
36
37
}
37
38
38
39
object Measure {
39
40
40
41
private val valueColumnName : String = " value"
41
42
42
43
val supportedMeasures : Seq [MeasureType ] = Seq (
43
- RecordCount , DistinctRecordCount , SumOfValuesOfColumn , AbsSumOfValuesOfColumn , SumOfHashesOfColumn
44
+ RecordCount ,
45
+ DistinctRecordCount ,
46
+ SumOfValuesOfColumn ,
47
+ AbsSumOfValuesOfColumn ,
48
+ SumOfHashesOfColumn
44
49
)
45
50
val supportedMeasureNames : Seq [String ] = supportedMeasures.map(_.measureName)
46
51
47
- case class RecordCount private (controlCol : String , measureName : String , onlyForNumeric : Boolean ) extends Measure {
52
+ case class RecordCount private (
53
+ controlCol : String ,
54
+ measureName : String ,
55
+ resultValueType : ResultValueType .ResultValueType
56
+ ) extends Measure {
48
57
49
58
override def function : MeasurementFunction =
50
- (ds : DataFrame ) => ds.select(col(controlCol)).count().toString
59
+ (ds : DataFrame ) => {
60
+ val resultValue = ds.select(col(controlCol)).count().toString
61
+ ResultOfMeasurement (resultValue, resultValueType)
62
+ }
51
63
}
52
64
object RecordCount extends MeasureType {
53
- def apply (controlCol : String ): RecordCount = {
54
- RecordCount (controlCol, measureName, onlyForNumeric)
55
- }
65
+ def apply (controlCol : String ): RecordCount = RecordCount (controlCol, measureName, resultValueType)
56
66
57
67
override val measureName : String = " count"
58
- override val onlyForNumeric : Boolean = false
68
+ override val resultValueType : ResultValueType . ResultValueType = ResultValueType . Long
59
69
}
60
70
61
- case class DistinctRecordCount private (controlCol : String , measureName : String , onlyForNumeric : Boolean )
62
- extends Measure {
71
+ case class DistinctRecordCount private (
72
+ controlCol : String ,
73
+ measureName : String ,
74
+ resultValueType : ResultValueType .ResultValueType
75
+ ) extends Measure {
63
76
64
77
override def function : MeasurementFunction =
65
- (ds : DataFrame ) => ds.select(col(controlCol)).distinct().count().toString
78
+ (ds : DataFrame ) => {
79
+ val resultValue = ds.select(col(controlCol)).distinct().count().toString
80
+ ResultOfMeasurement (resultValue, resultValueType)
81
+ }
66
82
}
67
-
68
83
object DistinctRecordCount extends MeasureType {
69
84
def apply (controlCol : String ): DistinctRecordCount = {
70
- DistinctRecordCount (controlCol, measureName, onlyForNumeric )
85
+ DistinctRecordCount (controlCol, measureName, resultValueType )
71
86
}
72
87
73
88
override val measureName : String = " distinctCount"
74
- override val onlyForNumeric : Boolean = false
89
+ override val resultValueType : ResultValueType . ResultValueType = ResultValueType . Long
75
90
}
76
91
77
- case class SumOfValuesOfColumn private (controlCol : String , measureName : String , onlyForNumeric : Boolean )
78
- extends Measure {
92
+ case class SumOfValuesOfColumn private (
93
+ controlCol : String ,
94
+ measureName : String ,
95
+ resultValueType : ResultValueType .ResultValueType
96
+ ) extends Measure {
79
97
80
98
override def function : MeasurementFunction = (ds : DataFrame ) => {
81
99
val aggCol = sum(col(valueColumnName))
82
- aggregateColumn(ds, controlCol, aggCol)
100
+ val resultValue = aggregateColumn(ds, controlCol, aggCol)
101
+ ResultOfMeasurement (resultValue, resultValueType)
83
102
}
84
103
}
85
-
86
104
object SumOfValuesOfColumn extends MeasureType {
87
105
def apply (controlCol : String ): SumOfValuesOfColumn = {
88
- SumOfValuesOfColumn (controlCol, measureName, onlyForNumeric )
106
+ SumOfValuesOfColumn (controlCol, measureName, resultValueType )
89
107
}
90
108
91
109
override val measureName : String = " aggregatedTotal"
92
- override val onlyForNumeric : Boolean = true
110
+ override val resultValueType : ResultValueType . ResultValueType = ResultValueType . BigDecimal
93
111
}
94
112
95
- case class AbsSumOfValuesOfColumn private (controlCol : String , measureName : String , onlyForNumeric : Boolean )
96
- extends Measure {
113
+ case class AbsSumOfValuesOfColumn private (
114
+ controlCol : String ,
115
+ measureName : String ,
116
+ resultValueType : ResultValueType .ResultValueType
117
+ ) extends Measure {
97
118
98
119
override def function : MeasurementFunction = (ds : DataFrame ) => {
99
120
val aggCol = sum(abs(col(valueColumnName)))
100
- aggregateColumn(ds, controlCol, aggCol)
121
+ val resultValue = aggregateColumn(ds, controlCol, aggCol)
122
+ ResultOfMeasurement (resultValue, resultValueType)
101
123
}
102
124
}
103
-
104
125
object AbsSumOfValuesOfColumn extends MeasureType {
105
126
def apply (controlCol : String ): AbsSumOfValuesOfColumn = {
106
- AbsSumOfValuesOfColumn (controlCol, measureName, onlyForNumeric )
127
+ AbsSumOfValuesOfColumn (controlCol, measureName, resultValueType )
107
128
}
108
129
109
130
override val measureName : String = " absAggregatedTotal"
110
- override val onlyForNumeric : Boolean = true
131
+ override val resultValueType : ResultValueType . ResultValueType = ResultValueType . Double
111
132
}
112
133
113
- case class SumOfHashesOfColumn private (controlCol : String , measureName : String , onlyForNumeric : Boolean )
114
- extends Measure {
134
+ case class SumOfHashesOfColumn private (
135
+ controlCol : String ,
136
+ measureName : String ,
137
+ resultValueType : ResultValueType .ResultValueType
138
+ ) extends Measure {
115
139
116
140
override def function : MeasurementFunction = (ds : DataFrame ) => {
117
141
@@ -120,17 +144,17 @@ object Measure {
120
144
.withColumn(aggregatedColumnName, crc32(col(controlCol).cast(" String" )))
121
145
.agg(sum(col(aggregatedColumnName)))
122
146
.collect()(0 )(0 )
123
- if (value == null ) " " else value.toString
147
+ val resultValue = if (value == null ) " " else value.toString
148
+ ResultOfMeasurement (resultValue, ResultValueType .String )
124
149
}
125
150
}
126
-
127
151
object SumOfHashesOfColumn extends MeasureType {
128
152
def apply (controlCol : String ): SumOfHashesOfColumn = {
129
- SumOfHashesOfColumn (controlCol, measureName, onlyForNumeric )
153
+ SumOfHashesOfColumn (controlCol, measureName, resultValueType )
130
154
}
131
155
132
156
override val measureName : String = " hashCrc32"
133
- override val onlyForNumeric : Boolean = false
157
+ override val resultValueType : ResultValueType . ResultValueType = ResultValueType . String
134
158
}
135
159
136
160
private def aggregateColumn (
0 commit comments