Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

9.0.x spaceinsights ai #682

Merged
merged 34 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
2eb4c63
[patch] CLOB column + offset + floor()
pkohlmann Aug 1, 2024
a604fba
[patch] replace last() by max() for HierarchySummary
pkohlmann Aug 2, 2024
48d6eb6
Merge pull request #670 from ibm-watson-iot/kohlmann-9.0.x-SummaryFun…
pkohlmann Aug 2, 2024
bd507bd
[patch] avoid internal pandas bug
pkohlmann Aug 5, 2024
fe7e030
Merge remote-tracking branch 'origin/9.0.x' into kohlmann-9.0.x-Summa…
pkohlmann Aug 5, 2024
e9a08d2
Merge pull request #671 from ibm-watson-iot/kohlmann-9.0.x-SummaryFun…
pkohlmann Aug 5, 2024
5a4f725
[patch] new function related to HeatMap
pkohlmann Aug 7, 2024
7be5af9
[patch] OccupancyLocation
pkohlmann Aug 8, 2024
90ee4b3
[patch] new category AGGREGATOR_CLOB + adjust check_sql_injection() f…
pkohlmann Aug 8, 2024
51d11f1
Merge remote-tracking branch 'origin/9.0.x' into kohlmann-9.0.x-Summa…
pkohlmann Aug 9, 2024
37f73a6
Merge pull request #672 from ibm-watson-iot/kohlmann-9.0.x-SummaryFun…
pkohlmann Aug 9, 2024
e4a751b
[patch] data item has type JSON
pkohlmann Aug 9, 2024
4812680
Merge pull request #673 from ibm-watson-iot/kohlmann-9.0.x-SummaryFun…
pkohlmann Aug 9, 2024
8f0e2b9
[patch] adjust JSON layout for OccupancyLocation
pkohlmann Aug 12, 2024
de34541
Merge remote-tracking branch 'origin/9.0.x' into kohlmann-9.0.x-Summa…
pkohlmann Aug 12, 2024
704253b
Merge pull request #674 from ibm-watson-iot/kohlmann-9.0.x-SummaryFun…
pkohlmann Aug 12, 2024
39ecb90
[patch] last values for forward-fill at start-boundary instead of max…
pkohlmann Aug 16, 2024
5778aba
Merge pull request #675 from ibm-watson-iot/kohlmann-9.0.x-SummaryFun…
pkohlmann Aug 16, 2024
15c85e1
[patch] enforce DatetimeIndex in MultiIndex
pkohlmann Aug 19, 2024
a263208
Merge pull request #676 from ibm-watson-iot/kohlmann-9.0.x-SummaryFun…
pkohlmann Aug 19, 2024
57adf81
[patch] Allow dots in device name in sal injection check
pkohlmann Aug 20, 2024
105e361
Merge pull request #677 from ibm-watson-iot/kohlmann-9.0.x-SummaryFun…
pkohlmann Aug 20, 2024
994de30
[patch] Allow dots in device name in sql injection check II
pkohlmann Aug 20, 2024
0e0636f
Merge pull request #678 from ibm-watson-iot/kohlmann-9.0.x-SummaryFun…
pkohlmann Aug 20, 2024
97b5ccc
[patch] Allow dots in device name in sal injection check III
pkohlmann Aug 20, 2024
53e93ce
Merge pull request #679 from ibm-watson-iot/kohlmann-9.0.x-SummaryFun…
pkohlmann Aug 20, 2024
1d0d16e
Add RobustThreshold method for space insights
sedgewickmm18 Sep 11, 2024
2119bdb
update
sedgewickmm18 Sep 11, 2024
0214ee5
update
sedgewickmm18 Sep 11, 2024
5d32140
update
sedgewickmm18 Sep 11, 2024
41608b1
update
sedgewickmm18 Sep 11, 2024
ec2b4f2
update
sedgewickmm18 Sep 11, 2024
c86fca8
fix tests
sedgewickmm18 Sep 11, 2024
ad9561b
update
sedgewickmm18 Sep 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion iotfunctions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@
import os
import pkgutil

__version__ = '9.0.0'
__version__ = '9.0.3'
__all__ = list(module for (_, module, _) in pkgutil.iter_modules([os.path.dirname(__file__)]))
129 changes: 108 additions & 21 deletions iotfunctions/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ def execute(self, df, start_ts=None, end_ts=None, entities=None, offset=None):
###############################################################################
# concat all results
###############################################################################
df = pd.concat(all_dfs, axis=1)
df = pd.concat(all_dfs, axis=1, sort=True)

# Corrective action: pd.concat() removes name from Index when we only have one level. There is no issue for
# MultiIndex which is used for two and more levels
Expand Down Expand Up @@ -702,7 +702,7 @@ def execute(self, group):

class MsiOccupancyCount(DirectAggregator):

KPI_FUNCTION_NAME = "MSI_OccupancyCount"
KPI_FUNCTION_NAME = "OccupancyCount"

BACKTRACK_IMPACTING_PARAMETER = "start_of_calculation"

Expand Down Expand Up @@ -744,8 +744,8 @@ def execute(self, df, group_base, group_base_names, start_ts=None, end_ts=None,
sql_schema_name = check_sql_injection(self.dms.schema)
sql_quoted_schema_name = dbhelper.quotingSchemaName(sql_schema_name, self.dms.is_postgre_sql)

sql_table_name = check_sql_injection(self.dms.entity_type_obj._data_items.get(self.output_name).get('sourceTableName'))
sql_quoted_table_name = dbhelper.quotingTableName(sql_table_name, self.dms.is_postgre_sql)
sql_output_table_name = check_sql_injection(self.dms.entity_type_obj._data_items.get(self.output_name).get('sourceTableName'))
sql_quoted_output_table_name = dbhelper.quotingTableName(sql_output_table_name, self.dms.is_postgre_sql)

# Find data item representing the result of this KPI function
result_data_item = self.dms.entity_type_obj._data_items.get(self.output_name)
Expand Down Expand Up @@ -816,7 +816,7 @@ def execute(self, df, group_base, group_base_names, start_ts=None, end_ts=None,
s_missing_result_values = None
if aligned_calc_start > aligned_cycle_start:
# Load missing OccupancyCount values from output table (this only happens in BackTrack mode)
sql_statement = f'SELECT "VALUE_N", "TIMESTAMP", "ENTITY_ID" FROM {sql_quoted_schema_name}.{sql_quoted_table_name} ' \
sql_statement = f'SELECT "VALUE_N", "TIMESTAMP", "ENTITY_ID" FROM {sql_quoted_schema_name}.{sql_quoted_output_table_name} ' \
f'WHERE "KEY" = ? AND "TIMESTAMP" >= ? AND "TIMESTAMP" < ? AND ' \
f'"ENTITY_ID" IN ({entities_placeholders}) ORDER BY "ENTITY_ID", "TIMESTAMP" ASC'

Expand Down Expand Up @@ -849,31 +849,60 @@ def execute(self, df, group_base, group_base_names, start_ts=None, end_ts=None,

# Because we only get a data event when the raw metric changes, but we want to provide values for all time
# units of the derived metric we have to fetch the latest available OccupancyCount values from
# output table to fill gaps at the beginning. This step is not required for cycles in which we do not
# input table to fill gaps at the beginning. This step is not required for cycles in which we do not
# calculate any OccupancyCount values.
if aligned_calc_start < aligned_cycle_end:
s_start_result_values = None

# Read just one result value per device right before aligned_calc_start.
sql_statement = f'WITH PREVIOUS_VALUES AS ' \
f'(SELECT "VALUE_N", "ENTITY_ID", ' \
f'ROW_NUMBER() OVER ( PARTITION BY "ENTITY_ID" ORDER BY "TIMESTAMP" DESC) ROW_NUM ' \
f'FROM {sql_quoted_schema_name}.{sql_quoted_table_name} ' \
f'WHERE "KEY" = ? AND "TIMESTAMP" < ? AND "ENTITY_ID" IN ({entities_placeholders})) ' \
f'SELECT "VALUE_N", "ENTITY_ID" FROM PREVIOUS_VALUES WHERE ROW_NUM = 1'
# Read just one result value per device right before aligned_calc_start. Distinguish input data item
# between raw metric and derived metric:
input_data_item = self.dms.entity_type_obj._data_items.get(self.raw_occupancy_count)
if input_data_item.get('type') == 'METRIC':
input_data_item_is_raw = True

# Raw metric table as input source
sql_quoted_input_table_name = dbhelper.quotingTableName(check_sql_injection(self.dms.eventTable),
self.dms.is_postgre_sql)
sql_quoted_timestamp_col_name = dbhelper.quotingColumnName(check_sql_injection(self.dms.eventTimestampColumn), self.dms.is_postgre_sql)
sql_quoted_device_id_col_name = dbhelper.quotingColumnName(check_sql_injection(self.dms.entityIdColumn), self.dms.is_postgre_sql)
sql_quoted_input_data_item_col_name = dbhelper.quotingColumnName(check_sql_injection(input_data_item['columnName']), self.dms.is_postgre_sql)

sql_statement = f'WITH PREVIOUS_VALUES AS ' \
f'(SELECT {sql_quoted_input_data_item_col_name}, {sql_quoted_device_id_col_name}, ' \
f'ROW_NUMBER() OVER ( PARTITION BY {sql_quoted_device_id_col_name} ORDER BY {sql_quoted_timestamp_col_name} DESC) ROW_NUM ' \
f'FROM {sql_quoted_schema_name}.{sql_quoted_input_table_name} ' \
f'WHERE {sql_quoted_timestamp_col_name} < ? AND {sql_quoted_device_id_col_name} IN ({entities_placeholders})) ' \
f'SELECT {sql_quoted_input_data_item_col_name}, {sql_quoted_device_id_col_name} FROM PREVIOUS_VALUES WHERE ROW_NUM = 1'
else:
input_data_item_is_raw = False

# Output table as input source
sql_quoted_input_table_name = dbhelper.quotingTableName(check_sql_injection(input_data_item.get('sourceTableName')),
self.dms.is_postgre_sql)

sql_statement = f'WITH PREVIOUS_VALUES AS ' \
f'(SELECT "VALUE_N", "ENTITY_ID", ' \
f'ROW_NUMBER() OVER ( PARTITION BY "ENTITY_ID" ORDER BY "TIMESTAMP" DESC) ROW_NUM ' \
f'FROM {sql_quoted_schema_name}.{sql_quoted_input_table_name} ' \
f'WHERE "KEY" = ? AND "TIMESTAMP" < ? AND "ENTITY_ID" IN ({entities_placeholders})) ' \
f'SELECT "VALUE_N", "ENTITY_ID" FROM PREVIOUS_VALUES WHERE ROW_NUM = 1'

stmt = ibm_db.prepare(self.dms.db_connection, sql_statement)
try:
ibm_db.bind_param(stmt, 1, self.output_name)
ibm_db.bind_param(stmt, 2, max(aligned_calc_start, aligned_cycle_start))
for position, device in enumerate(entities, 3):
ibm_db.bind_param(stmt, position, device)
position = 1
if input_data_item_is_raw is False:
ibm_db.bind_param(stmt, position, self.raw_occupancy_count)
position = position + 1
ibm_db.bind_param(stmt, position, max(aligned_calc_start, aligned_cycle_start))
position = position + 1
for pos, device in enumerate(entities, position):
ibm_db.bind_param(stmt, pos, device)
ibm_db.execute(stmt)
row = ibm_db.fetch_tuple(stmt)
result_data = []
result_index = []
while row is not False:
result_data.append(row[0])
result_data.append(row[0] if pd.notna(row[0]) and row[0] > 0 else 0.0)
result_index.append(row[1])
row = ibm_db.fetch_tuple(stmt)
if len(result_data) > 0:
Expand Down Expand Up @@ -902,7 +931,12 @@ def execute(self, df, group_base, group_base_names, start_ts=None, end_ts=None,
# Aggregate new column to get result metric. Result metric has name self.raw_output_name in data frame df_agg_result.
# Columns in group_base_names go into index of df_agg_result. We search for the last raw occupancy count
# in every aggregation interval.
df_agg_result = s_calc.groupby(group_base).agg(func=['max','last'])
# df_agg_result = s_calc.groupby(group_base).agg(func=['max','last'])
# Replace previous line by the following lines in an attempt to avoid internal bug in pandas
df_agg_result = s_calc.groupby(group_base).agg(func='max')
df_agg_result.name = 'max'
df_agg_result = df_agg_result.to_frame()
df_agg_result['last'] = s_calc.groupby(group_base).agg(func='last')

# Rename column 'max' in df_agg_result to self.output_name
df_agg_result.rename(columns={'max': self.output_name}, inplace=True)
Expand Down Expand Up @@ -955,7 +989,7 @@ def add_to_first(self, sub_s, value_map):

class MsiOccupancy(DirectAggregator):

KPI_FUNCTION_NAME = "MSI_Occupancy"
KPI_FUNCTION_NAME = "OccupancyDuration"

def __init__(self, occupancy_count, occupancy=None):
super().__init__()
Expand Down Expand Up @@ -1019,6 +1053,59 @@ def execute(self, df, group_base, group_base_names, start_ts=None, end_ts=None,
s_occupancy = s_occupancy.dt.total_seconds()

else:
s_occupancy = pd.Series([], index=pd.MultiIndex.from_arrays([[], []], names=group_base_names), name=self.output_name, dtype='float64')
s_occupancy = pd.Series([], index=pd.MultiIndex.from_arrays([[], pd.DatetimeIndex([])], names=group_base_names), name=self.output_name, dtype='float64')

return s_occupancy.to_frame()


class MsiOccupancyLocation(DirectAggregator):

KPI_FUNCTION_NAME = "OccupancyLocation"

def __init__(self, x_pos, y_pos, name=None):

super().__init__()
self.logger = logging.getLogger('%s.%s' % (self.__module__, self.__class__.__name__))

if x_pos is not None and len(x_pos) > 0:
self.x_pos = x_pos
else:
raise RuntimeError(f"Function {self.KPI_FUNCTION_NAME} requires the parameter x_pos "
f"but parameter x_pos is empty: x_pos={x_pos}")

if y_pos is not None and len(y_pos) > 0:
self.y_pos = y_pos
else:
raise RuntimeError(f"Function {self.KPI_FUNCTION_NAME} requires the parameter y_pos "
f"but parameter y_pos is empty: y_pos={y_pos}")

if name is not None and len(name) > 0:
self.output_name = name
else:
raise RuntimeError(f"No name was provided for the metric which is calculated by function "
f"{self.KPI_FUNCTION_NAME}: name={name}")

def execute(self, df, group_base, group_base_names, start_ts=None, end_ts=None, entities=None, offset=None):

if df.shape[0] > 0:
df_clean = df[[self.x_pos, self.y_pos]].dropna(how='any')
count_col_name = f'count{UNIQUE_EXTENSION_LABEL}'
df_clean[count_col_name] = 1
s_json = df_clean.groupby(group_base).apply(self.calc_locations, self.x_pos, self.y_pos, count_col_name)
else:
# No rows in data frame
s_json = pd.Series(data=[], index=df.index)

s_json.name = self.output_name
if s_json.dtype != object:
s_json = s_json.astype(object)

return s_json.to_frame()

def calc_locations(self, df, x_pos, y_pos, count_col_name):
s_count = df.groupby([x_pos, y_pos])[count_col_name].count()
x_y_count = []
for (x, y), count in s_count.items():
x_y_count.append({"x": x, "y": y, "count": count})

return {"data": x_y_count}
Loading
Loading