|
13 | 13 | from collections import defaultdict
|
14 | 14 |
|
15 | 15 | import pandas as pd
|
| 16 | +from pandas.api.types import is_string_dtype, is_numeric_dtype, is_bool_dtype, is_datetime64_ns_dtype, is_object_dtype |
16 | 17 | import numpy as np
|
17 | 18 | import ibm_db
|
18 | 19 |
|
@@ -228,6 +229,60 @@ def execute(self, df, start_ts=None, end_ts=None, entities=None, offset=None):
|
228 | 229 | # Aggregate
|
229 | 230 | if len(named_aggregations) > 0:
|
230 | 231 | agg_df_simple = groups.agg(**named_aggregations)
|
| 232 | + if agg_df_simple.empty: |
| 233 | + # Corrective action for unexpected behaviour of pandas: The user-defined (from the pandas perspective) |
| 234 | + # aggregation functions like our 'Count' function are never called when the corresponding dataframe is |
| 235 | + # empty. Although this behaviour (which was introduced in pandas 1.5.x) helps to avoid exceptions for |
| 236 | + # aggregation functions which are not designed to handle empty dataframes properly it causes issues |
| 237 | + # for function like 'Count': |
| 238 | + # The Count function returns a result of type 'int64' independent of the input type. This exchange of |
| 239 | + # type does not happen when our count function is never called. Therefore, pandas returns a (empty) |
| 240 | + # result column with the type of the input instead of type 'int64'. By the way, pandas build-in count() |
| 241 | + # function does not show this issue! |
| 242 | + # We take corrective action for all simple aggregators by enforcing the correct result type in case of |
| 243 | + # an empty dataframe |
| 244 | + |
| 245 | + # Determine the output metrics whose type must be changed. Then assign to the corresponding columns in |
| 246 | + # dataframe a dummy value of the expected type to enforce the conversion of type. This approach is much |
| 247 | + # simpler in case of an empty dataframe than using dataframe.astype() which throws an exception, |
| 248 | + # for example, when a datetime64[ns] is converted to a float64. |
| 249 | + for output_metric_name in agg_df_simple.columns: |
| 250 | + output_metric = self.dms.data_items.get(output_metric_name) |
| 251 | + if output_metric is not None: |
| 252 | + output_metric_type = output_metric.get('columnType') |
| 253 | + column_type = agg_df_simple[output_metric_name].dtype |
| 254 | + new_type = None |
| 255 | + if output_metric_type == "BOOLEAN": |
| 256 | + if not is_bool_dtype(column_type): |
| 257 | + new_type = 'boolean' |
| 258 | + agg_df_simple[output_metric_name] = True |
| 259 | + elif output_metric_type == "NUMBER": |
| 260 | + if not is_numeric_dtype(column_type): |
| 261 | + new_type = 'float64' |
| 262 | + agg_df_simple[output_metric_name] = 1.0 |
| 263 | + elif output_metric_type == "LITERAL": |
| 264 | + if not is_string_dtype(column_type): |
| 265 | + new_type = 'str' |
| 266 | + agg_df_simple[output_metric_name] = "" |
| 267 | + elif output_metric_type == "TIMESTAMP": |
| 268 | + if not is_datetime64_ns_dtype(column_type): |
| 269 | + new_type = 'datetime64[ns]' |
| 270 | + agg_df_simple[output_metric_name] = pd.TIMESTAMP(0) |
| 271 | + elif output_metric_type == "JSON": |
| 272 | + if not is_object_dtype(column_type): |
| 273 | + new_type = 'object' |
| 274 | + agg_df_simple[output_metric_name] = "" |
| 275 | + else: |
| 276 | + self.logger.warning(f"The output type could not be enforced for metric {output_metric_name} " |
| 277 | + f"because it has an unexpected type {output_metric_type}.") |
| 278 | + |
| 279 | + if new_type is not None: |
| 280 | + self.logger.debug(f"Type was changed from {column_type} to {new_type} for " |
| 281 | + f"output metric {output_metric_name}") |
| 282 | + else: |
| 283 | + self.logger.warning(f"The output type could not be enforced for metric {output_metric_name} " |
| 284 | + f"because it was not found in the list of defined metrics.") |
| 285 | + |
231 | 286 | log_data_frame('Data frame after application of simple aggregators', agg_df_simple)
|
232 | 287 |
|
233 | 288 | all_dfs.append(agg_df_simple)
|
|
0 commit comments