diff --git a/apps/mobility_robustness_optimization/mobility_robustness_optimization.py b/apps/mobility_robustness_optimization/mobility_robustness_optimization.py new file mode 100644 index 0000000..5856011 --- /dev/null +++ b/apps/mobility_robustness_optimization/mobility_robustness_optimization.py @@ -0,0 +1,540 @@ +import pandas as pd +import numpy as np +import pickle +import os +from typing import Any, Dict, List, Tuple, Optional + +from radp.digital_twin.utils import constants +from radp.digital_twin.utils.gis_tools import GISTools +from radp.digital_twin.rf.bayesian.bayesian_engine import ( + BayesianDigitalTwin, + NormMethod, +) +from notebooks.radp_library import get_percell_data +from radp.digital_twin.utils.cell_selection import perform_attachment +from notebooks.radp_library import get_ue_data + + +class MobilityRobustnessOptimization: + """ + A class that contains a prototypical proof-of-concept of an `Mobility Robustness Optimization (MRO)` RIC xApp. + """ + + def __init__( + self, + mobility_params: Dict[str, Dict], + topology: pd.DataFrame, + bdt: Optional[Dict[str, BayesianDigitalTwin]] = None, + ): + self.topology = topology + self.tx_power_dbm = 23 + self.bayesian_digital_twins = bdt if bdt is not None else {} + self.mobility_params = mobility_params + self.training_data = None + self.prediction_data = None + self.update_data = None + self.simulation_data = None + + def update(self, new_data: pd.DataFrame): + """ + (Re-)train Bayesian Digital Twins for each cell. + TODO: Add expected := [lat, lon, cell_id, "rsrp_dbm"] and redefine the method. + """ + try: + if not isinstance(new_data, pd.DataFrame): + raise TypeError("The input 'new_data' must be a pandas DataFrame.") + + expected_columns = {"mock_ue_id", "longitude", "latitude", "tick"} + if not expected_columns.issubset(new_data.columns): + raise ValueError( + f"The input DataFrame must contain the following columns: {expected_columns}" + ) + + if self.bayesian_digital_twins: + self.update_data = new_data + updated_data = self._preprocess_ue_update_data() + updated_data_list = list(updated_data.values()) + + for data_idx, update_data_df in enumerate(updated_data_list): + update_cell_id = data_idx + 1 + if update_cell_id in self.bayesian_digital_twins: + self.bayesian_digital_twins[ + update_cell_id + ].update_trained_gpmodel([update_data_df]) + else: + print( + "No Bayesian Digital Twins available for update. Training from scratch." + ) + self.training(maxiter=100, train_data=new_data) + except TypeError as te: + print(f"TypeError: {te}") + except ValueError as ve: + print(f"ValueError: {ve}") + except KeyError as ke: + print(f"KeyError: {ke}") + except Exception as e: + print(f"An unexpected error occurred: {e}") + + def save(bayesian_digital_twins, file_loc): + """ + Saves the Bayesian Digital Twins to a pickle file. Returns `True` if saving succeeds, + and `NotImplemented` if it fails. + + """ + filename = f"{file_loc}/digital_twins.pkl" + try: + if not isinstance(bayesian_digital_twins, dict): + raise TypeError("The input 'bayesian_digital_twins' must be a dictionary.") + + # Ensure the directory exists + os.makedirs(file_loc, exist_ok=True) + + with open(filename, "wb") as fp: + pickle.dump(bayesian_digital_twins, fp) + + print("Twins Saved Successfully as Pickle.") + return True # Indicate successful save + except TypeError as te: + print(f"TypeError: {te}") + except OSError as oe: + print(f"OSError: {oe}") + except Exception as e: + print(f"An unexpected error occurred: {e}") + + return NotImplemented # Return NotImplemented on failure + + def solve(self) -> Optional[Tuple[float]]: + """ + Iteratively optimizes the cell attachment strategy to find the best MRO metric. + Currently, 'perform_attachment' has no parameters to optimize, so this function + will focus on evaluating its current implementation. This setup is ready to be + expanded for parameter optimization in future developments. + """ + best_metric = None + + # Example of a simple loop that could be adapted for parameter tuning + # Since perform_attachment has no parameters now, we simulate one configuration + # This loop can be adapted to iterate over parameter sets for perform_attachment + for _ in range(1): # Single iteration for now, as there are no parameters to tune + mro_metric = self._calculate_metric() + + # Initialize or update the best_metric + if best_metric is None or mro_metric > best_metric: + best_metric = mro_metric + + return best_metric + + def _calculate_metric(self) -> float: + """ + Conducts the process of generating user equipment (UE) data, making predictions, + and computing Mobility Robustness Optimization (MRO) metrics. + """ + # Ensure Bayesian Digital Twins are trained before proceeding + if not self.bayesian_digital_twins: + raise ValueError("Bayesian Digital Twins are not trained. Train the models before calculating metrics.") + + # Generate and preprocess simulation data + self.simulation_data = get_ue_data(self.mobility_params) + self.simulation_data = self.simulation_data.rename(columns={"lat": "latitude", "lon": "longitude"}) + + # Predict power and perform attachment + predictions, full_prediction_df = self._predictions(self.simulation_data) + + # Reattach columns to combine original simulation data with the predictions + reattached_data = reattach_columns(predictions, full_prediction_df) + + # Count the number of successful and failed handovers + ns_handovers, nf_handovers, no_change = count_handovers(reattached_data) + + # Calculate and return the MRO Metric + mro_metric = calculate_mro_metric(ns_handovers, nf_handovers, self.simulation_data) + return mro_metric + + + def _training(self, maxiter: int, train_data: pd.DataFrame) -> List[float]: + """ + Trains the Bayesian Digital Twins for each cell in the topology using the UE locations and features + like log distance, relative bearing, and cell received power (Rx power). + """ + self.training_data = train_data + training_data = self._preprocess_ue_training_data() + bayesian_digital_twins = {} + loss_vs_iters = [] + for train_cell_id, training_data_idx in training_data.items(): + bayesian_digital_twins[train_cell_id] = BayesianDigitalTwin( + data_in=[training_data_idx], + x_columns=["log_distance", "relative_bearing"], + y_columns=["cell_rxpwr_dbm"], + norm_method=NormMethod.MINMAX, + ) + self.bayesian_digital_twins[train_cell_id] = bayesian_digital_twins[ + train_cell_id + ] + loss_vs_iters.append( + bayesian_digital_twins[train_cell_id].train_distributed_gpmodel( + maxiter=maxiter, + ) + ) + return loss_vs_iters + + def _predictions(self, pred_data) -> Tuple[pd.DataFrame, pd.DataFrame]: + """ + Predicts the received power for each User Equipment (UE) at different locations and ticks using Bayesian Digital Twins. + It then determines the best cell for each UE to attach based on the predicted power values. + """ + self.prediction_data = pred_data + prediction_data = self._preprocess_prediction_data() + full_prediction_df = pd.DataFrame() + + # Loop over each 'tick' + for tick, tick_df in prediction_data.groupby("tick"): + # Loop over each 'cell_id' within the current 'tick' + for cell_id, cell_df in tick_df.groupby("cell_id"): + cell_id = f"cell_{cell_id}" # FIXME: should look better + # Check if the Bayesian model for this cell_id exists + if cell_id in self.bayesian_digital_twins: + # Perform the Bayesian prediction + pred_means_percell, _ = self.bayesian_digital_twins[ + cell_id + ].predict_distributed_gpmodel(prediction_dfs=[cell_df]) + + # Assuming 'pred_means_percell' returns a list of predictions corresponding to the DataFrame index + cell_df["pred_means"] = pred_means_percell[0] + + # Include additional necessary columns for the final DataFrame + cell_df["tick"] = tick + cell_df["cell_id"] = cell_id + + # Append the predictions to the full DataFrame + full_prediction_df = pd.concat( + [full_prediction_df, cell_df], ignore_index=True + ) + else: + # Handle missing models, e.g., log a warning or initialize a default model + print( + f"No model available for cell_id {cell_id}, skipping prediction." + ) + + full_prediction_df = full_prediction_df.rename( + columns={"latitude": "loc_y", "longitude": "loc_x"} + ) + predicted = perform_attachment(full_prediction_df, self.topology) + + return predicted, full_prediction_df + + def _prepare_all_UEs_from_all_cells_df( + self, prediction: bool = False, simulation: bool = False, update: bool = False + ) -> pd.DataFrame: + """ + Connects each user equipment (UE) entry to all cells in the topology for each tick, + effectively creating a Cartesian product of UEs and cells, which includes data from both sources. + """ + + if prediction: + ue_data_tmp = self.prediction_data.copy() + elif simulation: + ue_data_tmp = self.simulation_data.copy() + elif update: + ue_data_tmp = self.update_data.copy() + else: + ue_data_tmp = self.training_data.copy() + topology_tmp = self.topology.copy() + # Remove the 'cell_' prefix and convert cell_id to integer if needed + if topology_tmp["cell_id"].dtype == object: + topology_tmp["cell_id"] = ( + topology_tmp["cell_id"].str.replace("cell_", "").astype(int) + ) + ue_data_tmp["key"] = 1 + topology_tmp["key"] = 1 + combined_df = pd.merge(ue_data_tmp, topology_tmp, on="key").drop("key", axis=1) + print(combined_df) + return combined_df + + def _calculate_received_power( + self, distance_km: float, frequency_mhz: int + ) -> float: + """ + Calculate received power using the Free-Space Path Loss (FSPL) model. + """ + # Convert distance from kilometers to meters + distance_m = distance_km * 1000 + + # Calculate Free-Space Path Loss (FSPL) in dB + fspl_db = 20 * np.log10(distance_m) + 20 * np.log10(frequency_mhz) - 27.55 + + # Calculate and return the received power in dBm + received_power_dbm = self.tx_power_dbm - fspl_db + return received_power_dbm + + def _preprocess_ue_topology_data(self) -> pd.DataFrame: + full_data = self._prepare_all_UEs_from_all_cells_df() + full_data["log_distance"] = full_data.apply( + lambda row: GISTools.get_log_distance( + row["latitude"], row["longitude"], row["cell_lat"], row["cell_lon"] + ), + axis=1, + ) + + full_data["cell_rxpwr_dbm"] = full_data.apply( + lambda row: self._calculate_received_power( + row["log_distance"], row["cell_carrier_freq_mhz"] + ), + axis=1, + ) + + return full_data + + # Change the type hint from pd.Dataframe to Dict for _preprocess_ue_training_data and _preprocess_ue_update_data + def _preprocess_ue_training_data(self) -> pd.DataFrame: + data = self._preprocess_ue_topology_data() + train_per_cell_df = [x for _, x in data.groupby("cell_id")] + n_cell = len(self.topology.index) + + metadata_df = pd.DataFrame( + { + "cell_id": [cell_id for cell_id in self.topology.cell_id], + "idx": [i + 1 for i in range(n_cell)], + } + ) + idx_cell_id_mapping = dict(zip(metadata_df.idx, metadata_df.cell_id)) + desired_idxs = [1 + r for r in range(n_cell)] + + n_samples_train = [] + for df in train_per_cell_df: + n_samples_train.append(df.shape[0]) + + train_per_cell_df_processed = [] + for i in range(n_cell): + train_per_cell_df_processed.append( + get_percell_data( + data_in=train_per_cell_df[i], + choose_strongest_samples_percell=False, + n_samples=n_samples_train[i], + )[0][0] + ) + + training_data = {} + + for i, df in enumerate(train_per_cell_df_processed): + train_cell_id = idx_cell_id_mapping[i + 1] + training_data[train_cell_id] = df + + for train_cell_id, training_data_idx in training_data.items(): + training_data_idx["cell_id"] = train_cell_id + training_data_idx["cell_lat"] = self.topology[ + self.topology["cell_id"] == train_cell_id + ]["cell_lat"].values[0] + training_data_idx["cell_lon"] = self.topology[ + self.topology["cell_id"] == train_cell_id + ]["cell_lon"].values[0] + training_data_idx["cell_az_deg"] = self.topology[ + self.topology["cell_id"] == train_cell_id + ]["cell_az_deg"].values[0] + training_data_idx["cell_carrier_freq_mhz"] = self.topology[ + self.topology["cell_id"] == train_cell_id + ]["cell_carrier_freq_mhz"].values[0] + training_data_idx["relative_bearing"] = [ + GISTools.get_relative_bearing( + training_data_idx["cell_az_deg"].values[0], + training_data_idx["cell_lat"].values[0], + training_data_idx["cell_lon"].values[0], + lat, + lon, + ) + for lat, lon in zip( + training_data_idx["latitude"], training_data_idx["longitude"] + ) + ] + + return training_data + + def _preprocess_ue_update_data(self) -> pd.DataFrame: + data = self._prepare_all_UEs_from_all_cells_df(update=True) + data["log_distance"] = data.apply( + lambda row: GISTools.get_log_distance( + row["latitude"], row["longitude"], row["cell_lat"], row["cell_lon"] + ), + axis=1, + ) + + data["cell_rxpwr_dbm"] = data.apply( + lambda row: self._calculate_received_power( + row["log_distance"], row["cell_carrier_freq_mhz"] + ), + axis=1, + ) + + update_per_cell_df = [x for _, x in data.groupby("cell_id")] + n_cell = len(self.topology.index) + + metadata_df = pd.DataFrame( + { + "cell_id": [cell_id for cell_id in self.topology.cell_id], + "idx": [i + 1 for i in range(n_cell)], + } + ) + idx_cell_id_mapping = dict(zip(metadata_df.idx, metadata_df.cell_id)) + + n_samples_update = [] + for df in update_per_cell_df: + n_samples_update.append(df.shape[0]) + + update_per_cell_df_processed = [] + for i in range(n_cell): + update_per_cell_df_processed.append( + get_percell_data( + data_in=update_per_cell_df[i], + choose_strongest_samples_percell=False, + n_samples=n_samples_update[i], + )[0][0] + ) + + update_data = {} + + for i, df in enumerate(update_per_cell_df_processed): + update_cell_id = idx_cell_id_mapping[i + 1] + update_data[update_cell_id] = df + + for update_cell_id, update_data_idx in update_data.items(): + update_data_idx["cell_id"] = update_cell_id + update_data_idx["cell_lat"] = self.topology[ + self.topology["cell_id"] == update_cell_id + ]["cell_lat"].values[0] + update_data_idx["cell_lon"] = self.topology[ + self.topology["cell_id"] == update_cell_id + ]["cell_lon"].values[0] + update_data_idx["cell_az_deg"] = self.topology[ + self.topology["cell_id"] == update_cell_id + ]["cell_az_deg"].values[0] + update_data_idx["cell_carrier_freq_mhz"] = self.topology[ + self.topology["cell_id"] == update_cell_id + ]["cell_carrier_freq_mhz"].values[0] + update_data_idx["relative_bearing"] = [ + GISTools.get_relative_bearing( + update_data_idx["cell_az_deg"].values[0], + update_data_idx["cell_lat"].values[0], + update_data_idx["cell_lon"].values[0], + lat, + lon, + ) + for lat, lon in zip( + update_data_idx["latitude"], update_data_idx["longitude"] + ) + ] + return update_data + + def _preprocess_prediction_data(self) -> pd.DataFrame: + data = self._prepare_all_UEs_from_all_cells_df(prediction=True) + + data["log_distance"] = data.apply( + lambda row: GISTools.get_log_distance( + row["latitude"], row["longitude"], row["cell_lat"], row["cell_lon"] + ), + axis=1, + ) + data["cell_rxpwr_dbm"] = data.apply( + lambda row: self._calculate_received_power( + row["log_distance"], row["cell_carrier_freq_mhz"] + ), + axis=1, + ) + + data["relative_bearing"] = data.apply( + lambda row: GISTools.get_relative_bearing( + row["cell_az_deg"], + row["cell_lat"], + row["cell_lon"], + row["latitude"], + row["longitude"], + ), + axis=1, + ) + return data + + +# Functions for MRO metrics and Handover events +def count_handovers(df): + # Initialize counters for NS (Successful Handovers), NF (Radio Link Failures), and no change + ns_handover_count = 0 + nf_handover_count = 0 + no_change = 0 + + # Threshold for considering a Radio Link Failure (RLF) + rlf_threshold = -2.9 + + # Track the previous state for each UE + ue_previous_state = {} + + # Loop through the dataframe row by row + for _, row in df.iterrows(): + ue_id = row["ue_id"] + current_cell_id = row["cell_id"] + current_sinr_db = row["sinr_db"] + + # Check if we have previous state for this UE + if ue_id in ue_previous_state: + previous_cell_id, previous_sinr_db = ue_previous_state[ue_id] + + # Check for cell ID change + if previous_cell_id != current_cell_id: + # Check if the SINR is above the threshold after a cell change + if current_sinr_db >= rlf_threshold: + ns_handover_count += 1 # Successful handover + else: + nf_handover_count += 1 # Failed handover due to RLF after change + elif previous_sinr_db < rlf_threshold and current_sinr_db >= rlf_threshold: + ns_handover_count += ( + 1 # Successful recovery from RLF without cell change + ) + elif current_sinr_db < rlf_threshold: + nf_handover_count += 1 # Ongoing or new RLF + else: + no_change += 1 # No significant event + + else: + # If first occurrence of UE has SINR below the RLF threshold, consider it as RLF + if current_sinr_db < rlf_threshold: + nf_handover_count += 1 + else: + no_change += 1 # No significant event when UE first appears and SINR is above threshold + + # Update the state for this UE + ue_previous_state[ue_id] = (current_cell_id, current_sinr_db) + + return ns_handover_count, nf_handover_count, no_change + + +def reattach_columns(predicted_df, full_prediction_df): + # Filter full_prediction_df for the needed columns and drop duplicates based on loc_x and loc_y + filtered_full_df = full_prediction_df[ + ["mock_ue_id", "tick", "loc_x", "loc_y"] + ].drop_duplicates(subset=["loc_x", "loc_y"]) + + # Merge with predicted_df based on loc_x and loc_y, ensuring size matches predicted_df + merged_df = pd.merge( + predicted_df, filtered_full_df, on=["loc_x", "loc_y"], how="left" + ) + + # Rename mock_ue_id to ue_id + merged_df.rename(columns={"mock_ue_id": "ue_id"}, inplace=True) + + return merged_df + + +def calculate_mro_metric(ns_handover_count, nf_handover_count, prediction_ue_data): + # Constants for interruption times + ts = 50 / 1000 # Convert ms to seconds + t_nas = 1000 / 1000 # Convert ms to seconds + + # Calculate total time (T) based on ticks; assuming each tick represents a uniform time slice + # This could be adjusted if ticks represent variable time slices + # Rather than passing the UE Data as whole we can send just an integar for tick + ticks = len(prediction_ue_data["tick"].unique()) + # Assuming each tick represents 50ms (this value may need to be adjusted based on actual data characteristics) + tick_duration_seconds = 1 # 1 second per tick + T = ticks * tick_duration_seconds + + # Calculate D + D = T - (ns_handover_count * ts + nf_handover_count * t_nas) + + return D diff --git a/apps/mobility_robustness_optimization/tests/test_mobility_robustness_optimization.py b/apps/mobility_robustness_optimization/tests/test_mobility_robustness_optimization.py new file mode 100644 index 0000000..fcb8e26 --- /dev/null +++ b/apps/mobility_robustness_optimization/tests/test_mobility_robustness_optimization.py @@ -0,0 +1,307 @@ +import unittest +import pandas as pd +import numpy as np +from apps.mobility_robustness_optimization.mobility_robustness_optimization import ( + MobilityRobustnessOptimization as MRO, + BayesianDigitalTwin, + NormMethod, + reattach_columns, + calculate_mro_metric, + count_handovers, +) +from unittest.mock import MagicMock + + +class TestMobilityRobustnessOptimization(unittest.TestCase): + def setUp(self): + self.dummy_topology = pd.DataFrame( + { + "cell_id": ["cell_001", "cell_002"], + "cell_lat": [45.0, 46.0], + "cell_lon": [-73.0, -74.0], + "cell_carrier_freq_mhz": [2100, 2000], + "cell_az_deg": [120, 240], + } + ) + + # Mocking additional data attributes used in the method + self.prediction_data = pd.DataFrame( + { + "ue_id": [0, 1], + "tick": [0, 1], + "loc_x": [10.0, 20.0], + "loc_y": [5.0, 6.0], + } + ) + self.simulation_data = pd.DataFrame( + { + "ue_id": [0, 1], + "tick": [0, 1], + "lon": [15.0, 25.0], + "lat": [10.0, 11.0], + "cell_id": [1, 2], + "cell_lat": [45.0, 46.0], + "cell_lon": [-73.0, -74.0], + "cell_carrier_freq_mhz": [ + 1800, + 2100, + ], + } + ) + self.update_data = pd.DataFrame( + { + "ue_id": [0, 1], + "tick": [0, 1], + "loc_x": [30.0, 40.0], + "loc_y": [12.0, 13.0], + } + ) + self.training_data = pd.DataFrame( + {"ue_id": [0, 1], "tick": [0, 1], "loc_x": [5.0, 10.0], "loc_y": [0.0, 1.0]} + ) + + self.mobility_params = { + "param1": {"value": 10, "type": "int"}, + "param2": {"value": 20, "type": "float"}, + } + + # Mock Bayesian Digital Twin + self.mock_bdt = MagicMock(spec=BayesianDigitalTwin) + + # Instantiate MRO object + self.mro = MRO( + self.mobility_params, self.dummy_topology, bdt={"cell_001": self.mock_bdt} + ) + self.mro.training_data = self.training_data + self.mro.prediction_data = self.prediction_data + self.mro.update_data = self.update_data + self.mro.simulation_data = self.simulation_data + + def test_update(self): # TODO: Implement AFTER PR + pass + + def test_solve(self): # TODO: Implement AFTER PR + pass + + + def test_training(self): + mro = MRO(mobility_params={}, topology=self.dummy_topology) + train_data = self.training_data.copy() + train_data.rename( + columns={"loc_x": "latitude", "loc_y": "longitude"}, inplace=True + ) + # n_iter = 5 + # for different n_inter + for n_iter in [5, 10, 20]: + loss_vs_iter = mro._training(maxiter=n_iter, train_data=train_data) + self.assertEqual(len(loss_vs_iter), len(self.dummy_topology["cell_id"])) + self.assertEqual(loss_vs_iter[0].shape[0], n_iter) + + def test_predictions(self): + # without _training() --> model not available --> empty df response + mro = MRO(mobility_params={}, topology=self.dummy_topology) + prediction_data = self.prediction_data.copy() + mro.prediction_data = prediction_data.rename( + columns={"loc_x": "latitude", "loc_y": "longitude"}, inplace=True + ) + predicted, full_prediction_df = mro._predictions(pred_data=prediction_data) + self.assertTrue(predicted.empty) + self.assertTrue(full_prediction_df.empty) + + # with _training() + topology = self.dummy_topology.copy() + topology["cell_id"] = ["cell_1", "cell_2"] + mro = MRO(mobility_params=self.mobility_params, topology=topology) + train_data = self.training_data.copy() + train_data.rename( + columns={"loc_x": "latitude", "loc_y": "longitude"}, inplace=True + ) + mro._training(20, train_data) # needed, otherwise model won't be available + prediction_data = self.prediction_data.copy() + prediction_data.rename( + columns={"loc_x": "latitude", "loc_y": "longitude"}, inplace=True + ) + predicted, full_prediction_df = mro._predictions(prediction_data) + self.assertEqual(predicted.shape, (2, 5)) + self.assertEqual(full_prediction_df.shape, (4, 15)) + + def test_prepare_all_UEs_from_all_cells_df(self): + result = self.mro._prepare_all_UEs_from_all_cells_df() + self.assertEqual(result.shape[0], 2 * 2) # 2 UEs x 2 cells + + def test_calculate_received_power(self): + dummy_distance = 1 + dummy_freq = 1800 + expected_power = -74.55545010206612 + power = self.mro._calculate_received_power( + distance_km=dummy_distance, frequency_mhz=dummy_freq + ) + self.assertEqual(expected_power, power) + + def test_preprocess_ue_topology_data(self): + result = self.mro._prepare_all_UEs_from_all_cells_df() + # Ensure that the resulting dataframe has the correct number of columns + self.assertIn("ue_id", result.columns) + self.assertIn("cell_id", result.columns) + + # Check if the combined dataframe has the correct number of rows (Cartesian product of UEs and cells) + self.assertEqual( + result.shape[0], len(self.update_data) * len(self.dummy_topology) + ) + + # Ensure that the combined dataframe has the expected values + self.assertTrue(all(result["ue_id"].isin(self.update_data["ue_id"]))) + + def test_preprocess_ue_training_data(self): + # fmt: off + expected_columns = ["ue_id","tick", "latitude", "longitude", "cell_id", "cell_lat", "cell_lon", + "cell_carrier_freq_mhz", "cell_az_deg", "log_distance", "cell_rxpwr_dbm", "relative_bearing",] + # fmt: on + self.mro.training_data.rename( + columns={"loc_x": "latitude", "loc_y": "longitude"}, inplace=True + ) + training_data = self.mro._preprocess_ue_training_data() + self.assertIsInstance(training_data, dict) + self.assertEqual(len(training_data), len(self.dummy_topology)) + for df in training_data.values(): + self.assertListEqual(list(df.columns), expected_columns) + self.assertTrue(all(df["ue_id"].isin(self.training_data["ue_id"]))) + + def test_preprocess_ue_update_data(self): + # fmt: off + expected_columns = ["ue_id","tick", "latitude", "longitude", "cell_id", "cell_lat", "cell_lon", + "cell_carrier_freq_mhz", "cell_az_deg", "log_distance", "cell_rxpwr_dbm", "relative_bearing",] + # fmt: on + self.mro.update_data.rename( + columns={"loc_x": "latitude", "loc_y": "longitude"}, inplace=True + ) + update_data = self.mro._preprocess_ue_update_data() + self.assertIsInstance(update_data, dict) + self.assertEqual(len(update_data), len(self.dummy_topology)) + for df in update_data.values(): + self.assertListEqual(list(df.columns), expected_columns) + self.assertTrue(all(df["ue_id"].isin(self.update_data["ue_id"]))) + + def test_preprocess_prediction_data(self): + # fmt: off + expected_columns = ["ue_id","tick", "latitude", "longitude", "cell_id", "cell_lat", "cell_lon", + "cell_carrier_freq_mhz", "cell_az_deg", "log_distance", "cell_rxpwr_dbm", "relative_bearing",] + # fmt: on + self.mro.prediction_data.rename( + columns={"loc_x": "latitude", "loc_y": "longitude"}, inplace=True + ) + data = self.mro._preprocess_prediction_data() + self.assertIsInstance(data, pd.DataFrame) + self.assertEqual( + len(data), len(self.dummy_topology) * len(self.mro.prediction_data["ue_id"]) + ) + self.assertListEqual(list(data.columns), expected_columns) + self.assertTrue(all(data["ue_id"].isin(self.prediction_data["ue_id"]))) + + def test_count_handovers(self): + # Define the data with 5 ticks and 2 distinct UE ids + df = pd.DataFrame({ + "loc_x": [ + -22.625309, 119.764151, 72.095437, -67.548009, 59.867089, + -22.625309, 119.764151, 72.095437, -67.548009, 59.867089, + -22.625309, 119.764151, 72.095437, -67.548009, 59.867089, + -22.625309, 119.764151, 72.095437, -67.548009, 59.867089, + -22.625309, 119.764151, 72.095437, -67.548009, 59.867089, + ], + "loc_y": [ + 59.806764, 54.857584, -20.253892, -38.100941, -83.103930, + 59.806764, 54.857584, -20.253892, -38.100941, -83.103930, + 59.806764, 54.857584, -20.253892, -38.100941, -83.103930, + 59.806764, 54.857584, -20.253892, -38.100941, -83.103930, + 59.806764, 54.857584, -20.253892, -38.100941, -83.103930, + ], + "cell_id": [ + 3.0, 3.0, 1.0, 1.0, 1.0, 3.0, 3.0, 1.0, 1.0, 1.0, + 3.0, 3.0, 1.0, 1.0, 1.0, 3.0, 3.0, 1.0, 1.0, 1.0, + 3.0, 3.0, 1.0, 1.0, 1.0, + ], + "sinr_db": [ + -2.379967, -2.327379, -2.879403, -2.681959, -1.272086, + -2.379967, -2.327379, -2.879403, -2.681959, -1.272086, + -2.379967, -2.327379, -2.879403, -2.681959, -1.272086, + -2.379967, -2.327379, -2.879403, -2.681959, -1.272086, + -2.379967, -2.327379, -2.879403, -2.681959, -1.272086, + ], + "rsrp_dbm": [ + -99.439212, -99.529860, -99.914970, -99.750036, -98.454310, + -99.439212, -99.529860, -99.914970, -99.750036, -98.454310, + -99.439212, -99.529860, -99.914970, -99.750036, -98.454310, + -99.439212, -99.529860, -99.914970, -99.750036, -98.454310, + -99.439212, -99.529860, -99.914970, -99.750036, -98.454310, + ], + "ue_id": [ + 0, 1, 0, 1, 0, 0, 1, 0, 1, 0, + 0, 1, 0, 1, 0, 0, 1, 0, 1, 0, + 0, 1, 0, 1, 0, + ], + "tick": [ + 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, + 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, + 4, 4, 4, 4, 4, + ], + }) + + ns_handover_count, nf_handover_count, no_change = count_handovers(df) + expected_ns = 18 + expected_nf = 0 + expected_no_change = 7 + self.assertEqual(ns_handover_count, expected_ns) + self.assertEqual(nf_handover_count, expected_nf) + self.assertEqual(no_change, expected_no_change) + + def test_reattach_columns(self): + self.predicted_df = pd.DataFrame( + {"tick": [1, 2, 3, 4], "loc_x": [10, 20, 30, 40], "loc_y": [5, 6, 7, 8]} + ) + self.full_prediction_df = pd.DataFrame( + { + "mock_ue_id": [100, 101, 102, 103], + "tick": [1, 2, 3, 4], + "loc_x": [10, 20, 30, 40], + "loc_y": [5, 6, 7, 8], + } + ) + full_prediction_df_no_match = pd.DataFrame( + { + "mock_ue_id": [200, 201, 202, 203], + "tick": [1, 2, 3, 4], + "loc_x": [50, 60, 70, 80], # No matching loc_x values + "loc_y": [9, 10, 11, 12], # No matching loc_y values + } + ) + + result = reattach_columns(self.predicted_df, self.full_prediction_df) + + self.assertTrue("ue_id" in result.columns) # Ensure 'ue_id' column exists + self.assertFalse( + "mock_ue_id" in result.columns + ) # Ensure 'mock_ue_id' column is removed + self.assertEqual( + result.shape[0], self.predicted_df.shape[0] + ) # Ensure size matches predicted_df + self.assertEqual(result.loc[0, "ue_id"], 100) + self.assertEqual(result.loc[1, "ue_id"], 101) + + result = reattach_columns(self.predicted_df, full_prediction_df_no_match) + self.assertTrue(result["ue_id"].isna().all()) + + def test_calculate_mro_metric(self): + data = { + "tick": [1, 2, 3, 4, 5] * 10, # 5 unique ticks, each repeated 10 times + "loc_x": range(50), # Dummy x coordinates (you can modify as needed) + "loc_y": range(50), # Dummy y coordinates (you can modify as needed) + "ue_id": range(50), # Dummy UE IDs + "sinr_db": [-3.0, -1.5, 1.0, -2.0, 0.5] * 10, # Dummy SINR values + } + dummy_pred_data = pd.DataFrame(data) + ns_count = 1 + nf_count = 2 + + d = calculate_mro_metric(ns_count, nf_count, dummy_pred_data) + real_d = 5 - (ns_count * (50 / 1000) + nf_count * (1000 / 1000)) + self.assertEqual(d, real_d) diff --git a/notebooks/radp_library.py b/notebooks/radp_library.py index 9d44da4..fa0c943 100644 --- a/notebooks/radp_library.py +++ b/notebooks/radp_library.py @@ -1133,3 +1133,53 @@ def plot_ue_tracks_on_axis(df: pd.DataFrame, ax, title: str) -> None: ax.set_title(title) ax.legend() + +#MRO app helper functions + +# Scatter plot of the Cell towers and UE Locations + +def mro_plot_scatter(df, topology): + # Create a figure and axis + plt.figure(figsize=(10, 8)) + + plt.scatter([], [], color="grey", label="RLF") + + # Define color mapping based on cell_id for both cells and UEs + color_map = {1: "red", 2: "green", 3: "blue"} + + # Plot cell towers from the topology dataframe with 'X' markers and corresponding colors + for _, row in topology.iterrows(): + color = color_map.get( + row["cell_id"], "black" + ) # Default to black if unknown cell_id + plt.scatter( + row["cell_lon"], + row["cell_lat"], + marker="x", + color=color, + s=200, + label=f"Cell {row['cell_id']}", + ) + + # Plot UEs from df without labels but with the same color coding + for _, row in df.iterrows(): + color = color_map.get( + row["cell_id"], "black" + ) # Default to black if unknown cell_id + if row["sinr_db"] < -2.9: # REMOVE COMMENT WHEN sinr_db IS FIXED + color = "grey" # Change to grey if sinr_db < 2 + + plt.scatter(row["loc_x"], row["loc_y"], color=color) + + # Add labels and title + plt.xlabel("Longitude (loc_x)") + plt.ylabel("Latitude (loc_y)") + plt.title("Cell Towers and UE Locations") + + # Create a legend for the cells only + handles, labels = plt.gca().get_legend_handles_labels() + by_label = dict(zip(labels, handles)) + plt.legend(by_label.values(), by_label.keys()) + + # Show the plot + plt.show() \ No newline at end of file diff --git a/requirements-dev.txt b/requirements-dev.txt index 8358263..8fae086 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -19,3 +19,5 @@ rasterio==1.3.5.post1 scikit-image==0.19.3 scikit-learn==1.2.1 shapely==2.0.1 +python-dotenv==1.0.1 +