-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Description
The problem:
As part of the training process, I save the features generated as a Pyspark dataframe (train_features_df). During inference time, I use tsfresh's feature_extraction.settings.from_columns method on train_features_df to extract the set of features to be generated per column for the inference data:
columns_of_interest = [
"RoomTemp",
"CoilTemp",
"FanRelay"
]
train_features_df = spark.read.format("parquet").load(<PATH>)
train_features_pdf = train_features_df.toPandas()
train_features_pdf = train_features_pdf.drop(columns=["id"])
features = train_features_pdf.columns.tolist()
train_kind_to_fc_parameters = tsfresh.feature_extraction.settings.from_columns(features)
# Inference data consists of last 24 hours worth of telemetry
inference_features_df = generate_features(inference_df, columns_of_interest, "normalized",
train_kind_to_fc_parameters)
inference_features_pdf = inference_features_df.toPandas()
inference_features_pdf = inference_features_pdf.drop(columns=["id"])
inference_features = inference_features_pdf.columns.tolist()
inference_kind_to_fc_parameters = tsfresh.feature_extraction.settings.from_columns(inference_features)
print(inference_kind_to_fc_parameters == train_kind_to_fc_parameters) # Prints False
I use the below function to generate features:
def generate_features(filtered_combined_df, columns_of_interest, prefix, fc_parameters=None):
@pandas_udf("id string, features map<string, double>", PandasUDFType.GROUPED_MAP)
def extract_tsfresh_features(pdf):
if not fc_parameters:
extracted_features = extract_features(pdf,
column_id='id', column_sort='time',
column_kind='kind', column_value='value',
default_fc_parameters=EfficientFCParameters(),
disable_progressbar=True)
else:
extracted_features = extract_features(pdf,
column_id='id', column_sort='time',
column_kind='kind', column_value='value',
kind_to_fc_parameters=fc_parameters,
disable_progressbar=True)
result_pdf = pd.DataFrame({
"id": extracted_features.index,
"features": extracted_features.to_dict(orient="records")
})
return result_pdf
stack_expr = ", ".join([f"'{col_name}', cast({col_name} as string)" for col_name in columns_of_interest])
df_pivot = filtered_combined_df.selectExpr(
"time", "UUID",
f"stack({len(columns_of_interest)}, {stack_expr}) as (kind, value)"
)
df_pivot = df_pivot.withColumn("value", col("value").cast("float")) \
.withColumnRenamed("UUID", "id").where(col("value").isNotNull())
features_df = df_pivot.groupby("id").apply(extract_tsfresh_features)
first_row_df = features_df.limit(1).selectExpr("explode(features) as (key, value)")
keys = [row['key'] for row in first_row_df.collect()]
select_exprs = [col("id")] + [expr(f"features['{key}']").alias(f"{prefix}_{key}") for key in keys]
features_pivoted_df = features_df.select(*select_exprs)
print("Features generated successfully.")
return features_pivoted_df
I notice that the features in inference data are slightly different than those in training data.
When I compare inference_kind_to_fc_parameters with train_kind_to_fc_parameters, I notice that inference_kind_to_fc_parameters doesn't have an entry for FanRelay column. How do I fix the mismatch in features being generated during training and inference stages?
Anything else we need to know?:
Note: Training process consumes more than one year worth of telemetry whereas inference data looks at the last 24 hours worth of telemetry. I also looked at the FanRelay column in inference data and it has all float values.
Environment:
- Python version: 3.10.12
- Operating System: macOS Sequoia
- tsfresh version: 0.20.2
- Install method (conda, pip, source): pip