Skip to content

Commit 91ef816

Browse files
Merge pull request #682 from ibm-watson-iot/9.0.x-spaceinsightsAI
9.0.x spaceinsights ai
2 parents 8e1b432 + ad9561b commit 91ef816

File tree

8 files changed

+344
-80
lines changed

8 files changed

+344
-80
lines changed

iotfunctions/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,5 @@
1111
import os
1212
import pkgutil
1313

14-
__version__ = '9.0.0'
14+
__version__ = '9.0.3'
1515
__all__ = list(module for (_, module, _) in pkgutil.iter_modules([os.path.dirname(__file__)]))

iotfunctions/aggregate.py

+108-21
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ def execute(self, df, start_ts=None, end_ts=None, entities=None, offset=None):
311311
###############################################################################
312312
# concat all results
313313
###############################################################################
314-
df = pd.concat(all_dfs, axis=1)
314+
df = pd.concat(all_dfs, axis=1, sort=True)
315315

316316
# Corrective action: pd.concat() removes name from Index when we only have one level. There is no issue for
317317
# MultiIndex which is used for two and more levels
@@ -702,7 +702,7 @@ def execute(self, group):
702702

703703
class MsiOccupancyCount(DirectAggregator):
704704

705-
KPI_FUNCTION_NAME = "MSI_OccupancyCount"
705+
KPI_FUNCTION_NAME = "OccupancyCount"
706706

707707
BACKTRACK_IMPACTING_PARAMETER = "start_of_calculation"
708708

@@ -744,8 +744,8 @@ def execute(self, df, group_base, group_base_names, start_ts=None, end_ts=None,
744744
sql_schema_name = check_sql_injection(self.dms.schema)
745745
sql_quoted_schema_name = dbhelper.quotingSchemaName(sql_schema_name, self.dms.is_postgre_sql)
746746

747-
sql_table_name = check_sql_injection(self.dms.entity_type_obj._data_items.get(self.output_name).get('sourceTableName'))
748-
sql_quoted_table_name = dbhelper.quotingTableName(sql_table_name, self.dms.is_postgre_sql)
747+
sql_output_table_name = check_sql_injection(self.dms.entity_type_obj._data_items.get(self.output_name).get('sourceTableName'))
748+
sql_quoted_output_table_name = dbhelper.quotingTableName(sql_output_table_name, self.dms.is_postgre_sql)
749749

750750
# Find data item representing the result of this KPI function
751751
result_data_item = self.dms.entity_type_obj._data_items.get(self.output_name)
@@ -816,7 +816,7 @@ def execute(self, df, group_base, group_base_names, start_ts=None, end_ts=None,
816816
s_missing_result_values = None
817817
if aligned_calc_start > aligned_cycle_start:
818818
# Load missing OccupancyCount values from output table (this only happens in BackTrack mode)
819-
sql_statement = f'SELECT "VALUE_N", "TIMESTAMP", "ENTITY_ID" FROM {sql_quoted_schema_name}.{sql_quoted_table_name} ' \
819+
sql_statement = f'SELECT "VALUE_N", "TIMESTAMP", "ENTITY_ID" FROM {sql_quoted_schema_name}.{sql_quoted_output_table_name} ' \
820820
f'WHERE "KEY" = ? AND "TIMESTAMP" >= ? AND "TIMESTAMP" < ? AND ' \
821821
f'"ENTITY_ID" IN ({entities_placeholders}) ORDER BY "ENTITY_ID", "TIMESTAMP" ASC'
822822

@@ -849,31 +849,60 @@ def execute(self, df, group_base, group_base_names, start_ts=None, end_ts=None,
849849

850850
# Because we only get a data event when the raw metric changes, but we want to provide values for all time
851851
# units of the derived metric we have to fetch the latest available OccupancyCount values from
852-
# output table to fill gaps at the beginning. This step is not required for cycles in which we do not
852+
# input table to fill gaps at the beginning. This step is not required for cycles in which we do not
853853
# calculate any OccupancyCount values.
854854
if aligned_calc_start < aligned_cycle_end:
855855
s_start_result_values = None
856856

857-
# Read just one result value per device right before aligned_calc_start.
858-
sql_statement = f'WITH PREVIOUS_VALUES AS ' \
859-
f'(SELECT "VALUE_N", "ENTITY_ID", ' \
860-
f'ROW_NUMBER() OVER ( PARTITION BY "ENTITY_ID" ORDER BY "TIMESTAMP" DESC) ROW_NUM ' \
861-
f'FROM {sql_quoted_schema_name}.{sql_quoted_table_name} ' \
862-
f'WHERE "KEY" = ? AND "TIMESTAMP" < ? AND "ENTITY_ID" IN ({entities_placeholders})) ' \
863-
f'SELECT "VALUE_N", "ENTITY_ID" FROM PREVIOUS_VALUES WHERE ROW_NUM = 1'
857+
# Read just one result value per device right before aligned_calc_start. Distinguish input data item
858+
# between raw metric and derived metric:
859+
input_data_item = self.dms.entity_type_obj._data_items.get(self.raw_occupancy_count)
860+
if input_data_item.get('type') == 'METRIC':
861+
input_data_item_is_raw = True
862+
863+
# Raw metric table as input source
864+
sql_quoted_input_table_name = dbhelper.quotingTableName(check_sql_injection(self.dms.eventTable),
865+
self.dms.is_postgre_sql)
866+
sql_quoted_timestamp_col_name = dbhelper.quotingColumnName(check_sql_injection(self.dms.eventTimestampColumn), self.dms.is_postgre_sql)
867+
sql_quoted_device_id_col_name = dbhelper.quotingColumnName(check_sql_injection(self.dms.entityIdColumn), self.dms.is_postgre_sql)
868+
sql_quoted_input_data_item_col_name = dbhelper.quotingColumnName(check_sql_injection(input_data_item['columnName']), self.dms.is_postgre_sql)
869+
870+
sql_statement = f'WITH PREVIOUS_VALUES AS ' \
871+
f'(SELECT {sql_quoted_input_data_item_col_name}, {sql_quoted_device_id_col_name}, ' \
872+
f'ROW_NUMBER() OVER ( PARTITION BY {sql_quoted_device_id_col_name} ORDER BY {sql_quoted_timestamp_col_name} DESC) ROW_NUM ' \
873+
f'FROM {sql_quoted_schema_name}.{sql_quoted_input_table_name} ' \
874+
f'WHERE {sql_quoted_timestamp_col_name} < ? AND {sql_quoted_device_id_col_name} IN ({entities_placeholders})) ' \
875+
f'SELECT {sql_quoted_input_data_item_col_name}, {sql_quoted_device_id_col_name} FROM PREVIOUS_VALUES WHERE ROW_NUM = 1'
876+
else:
877+
input_data_item_is_raw = False
878+
879+
# Output table as input source
880+
sql_quoted_input_table_name = dbhelper.quotingTableName(check_sql_injection(input_data_item.get('sourceTableName')),
881+
self.dms.is_postgre_sql)
882+
883+
sql_statement = f'WITH PREVIOUS_VALUES AS ' \
884+
f'(SELECT "VALUE_N", "ENTITY_ID", ' \
885+
f'ROW_NUMBER() OVER ( PARTITION BY "ENTITY_ID" ORDER BY "TIMESTAMP" DESC) ROW_NUM ' \
886+
f'FROM {sql_quoted_schema_name}.{sql_quoted_input_table_name} ' \
887+
f'WHERE "KEY" = ? AND "TIMESTAMP" < ? AND "ENTITY_ID" IN ({entities_placeholders})) ' \
888+
f'SELECT "VALUE_N", "ENTITY_ID" FROM PREVIOUS_VALUES WHERE ROW_NUM = 1'
864889

865890
stmt = ibm_db.prepare(self.dms.db_connection, sql_statement)
866891
try:
867-
ibm_db.bind_param(stmt, 1, self.output_name)
868-
ibm_db.bind_param(stmt, 2, max(aligned_calc_start, aligned_cycle_start))
869-
for position, device in enumerate(entities, 3):
870-
ibm_db.bind_param(stmt, position, device)
892+
position = 1
893+
if input_data_item_is_raw is False:
894+
ibm_db.bind_param(stmt, position, self.raw_occupancy_count)
895+
position = position + 1
896+
ibm_db.bind_param(stmt, position, max(aligned_calc_start, aligned_cycle_start))
897+
position = position + 1
898+
for pos, device in enumerate(entities, position):
899+
ibm_db.bind_param(stmt, pos, device)
871900
ibm_db.execute(stmt)
872901
row = ibm_db.fetch_tuple(stmt)
873902
result_data = []
874903
result_index = []
875904
while row is not False:
876-
result_data.append(row[0])
905+
result_data.append(row[0] if pd.notna(row[0]) and row[0] > 0 else 0.0)
877906
result_index.append(row[1])
878907
row = ibm_db.fetch_tuple(stmt)
879908
if len(result_data) > 0:
@@ -902,7 +931,12 @@ def execute(self, df, group_base, group_base_names, start_ts=None, end_ts=None,
902931
# Aggregate new column to get result metric. Result metric has name self.raw_output_name in data frame df_agg_result.
903932
# Columns in group_base_names go into index of df_agg_result. We search for the last raw occupancy count
904933
# in every aggregation interval.
905-
df_agg_result = s_calc.groupby(group_base).agg(func=['max','last'])
934+
# df_agg_result = s_calc.groupby(group_base).agg(func=['max','last'])
935+
# Replace previous line by the following lines in an attempt to avoid internal bug in pandas
936+
df_agg_result = s_calc.groupby(group_base).agg(func='max')
937+
df_agg_result.name = 'max'
938+
df_agg_result = df_agg_result.to_frame()
939+
df_agg_result['last'] = s_calc.groupby(group_base).agg(func='last')
906940

907941
# Rename column 'max' in df_agg_result to self.output_name
908942
df_agg_result.rename(columns={'max': self.output_name}, inplace=True)
@@ -955,7 +989,7 @@ def add_to_first(self, sub_s, value_map):
955989

956990
class MsiOccupancy(DirectAggregator):
957991

958-
KPI_FUNCTION_NAME = "MSI_Occupancy"
992+
KPI_FUNCTION_NAME = "OccupancyDuration"
959993

960994
def __init__(self, occupancy_count, occupancy=None):
961995
super().__init__()
@@ -1019,6 +1053,59 @@ def execute(self, df, group_base, group_base_names, start_ts=None, end_ts=None,
10191053
s_occupancy = s_occupancy.dt.total_seconds()
10201054

10211055
else:
1022-
s_occupancy = pd.Series([], index=pd.MultiIndex.from_arrays([[], []], names=group_base_names), name=self.output_name, dtype='float64')
1056+
s_occupancy = pd.Series([], index=pd.MultiIndex.from_arrays([[], pd.DatetimeIndex([])], names=group_base_names), name=self.output_name, dtype='float64')
10231057

10241058
return s_occupancy.to_frame()
1059+
1060+
1061+
class MsiOccupancyLocation(DirectAggregator):
1062+
1063+
KPI_FUNCTION_NAME = "OccupancyLocation"
1064+
1065+
def __init__(self, x_pos, y_pos, name=None):
1066+
1067+
super().__init__()
1068+
self.logger = logging.getLogger('%s.%s' % (self.__module__, self.__class__.__name__))
1069+
1070+
if x_pos is not None and len(x_pos) > 0:
1071+
self.x_pos = x_pos
1072+
else:
1073+
raise RuntimeError(f"Function {self.KPI_FUNCTION_NAME} requires the parameter x_pos "
1074+
f"but parameter x_pos is empty: x_pos={x_pos}")
1075+
1076+
if y_pos is not None and len(y_pos) > 0:
1077+
self.y_pos = y_pos
1078+
else:
1079+
raise RuntimeError(f"Function {self.KPI_FUNCTION_NAME} requires the parameter y_pos "
1080+
f"but parameter y_pos is empty: y_pos={y_pos}")
1081+
1082+
if name is not None and len(name) > 0:
1083+
self.output_name = name
1084+
else:
1085+
raise RuntimeError(f"No name was provided for the metric which is calculated by function "
1086+
f"{self.KPI_FUNCTION_NAME}: name={name}")
1087+
1088+
def execute(self, df, group_base, group_base_names, start_ts=None, end_ts=None, entities=None, offset=None):
1089+
1090+
if df.shape[0] > 0:
1091+
df_clean = df[[self.x_pos, self.y_pos]].dropna(how='any')
1092+
count_col_name = f'count{UNIQUE_EXTENSION_LABEL}'
1093+
df_clean[count_col_name] = 1
1094+
s_json = df_clean.groupby(group_base).apply(self.calc_locations, self.x_pos, self.y_pos, count_col_name)
1095+
else:
1096+
# No rows in data frame
1097+
s_json = pd.Series(data=[], index=df.index)
1098+
1099+
s_json.name = self.output_name
1100+
if s_json.dtype != object:
1101+
s_json = s_json.astype(object)
1102+
1103+
return s_json.to_frame()
1104+
1105+
def calc_locations(self, df, x_pos, y_pos, count_col_name):
1106+
s_count = df.groupby([x_pos, y_pos])[count_col_name].count()
1107+
x_y_count = []
1108+
for (x, y), count in s_count.items():
1109+
x_y_count.append({"x": x, "y": y, "count": count})
1110+
1111+
return {"data": x_y_count}

0 commit comments

Comments
 (0)