|
| 1 | +from pyspi.calculator import Calculator, Data |
| 2 | +from pyspi.data import load_dataset |
| 3 | +import numpy as np |
| 4 | +import os |
| 5 | +import yaml |
| 6 | +import pytest |
| 7 | + |
| 8 | +############################# Test Calculator Object ######################## |
| 9 | +def test_whether_calculator_instatiates(): |
| 10 | + """Basic test to check whether or not the calculator will instantiate.""" |
| 11 | + calc = Calculator() |
| 12 | + assert isinstance(calc, Calculator), "Calculator failed to instantiate." |
| 13 | + |
| 14 | +def test_default_calculator_instantiates_with_correct_num_spis(): |
| 15 | + """Test whether the default calculator instantiates with the full SPI set""" |
| 16 | + calc = Calculator() |
| 17 | + n_spis_actual = calc.n_spis |
| 18 | + # get expected number of spis based on yaml |
| 19 | + with open('pyspi/config.yaml', 'rb') as y: |
| 20 | + yaml_file = yaml.full_load(y) |
| 21 | + count = 0 |
| 22 | + for module in yaml_file.keys(): |
| 23 | + for base_spi in yaml_file[module].keys(): |
| 24 | + if yaml_file[module][base_spi] == None: |
| 25 | + count += 1 |
| 26 | + else: |
| 27 | + count += len(yaml_file[module][base_spi]) |
| 28 | + assert count == n_spis_actual, f"Number of SPIs loaded from the calculator ({n_spis_actual}) does not matched expected amount {count}" |
| 29 | + |
| 30 | +@pytest.mark.parametrize("subset", [ |
| 31 | + 'fabfour', |
| 32 | + 'fast', |
| 33 | + 'sonnet', |
| 34 | + 'octaveless' |
| 35 | +]) |
| 36 | +def test_whether_calculator_instantiates_with_subsets(subset): |
| 37 | + """Test whether the calculator instantiates with each of the available subsets""" |
| 38 | + calc = Calculator(subset=subset) |
| 39 | + assert isinstance(calc, Calculator), "Calculator failed to instantiate" |
| 40 | + |
| 41 | +def test_whether_invalid_subset_throws_error(): |
| 42 | + """Test whether the calculator fails to instantiate with an invalid subset""" |
| 43 | + with pytest.raises(ValueError) as excinfo: |
| 44 | + calc = Calculator(subset='nviutw') |
| 45 | + assert "Subset 'nviutw' does not exist" in str(excinfo.value), "Subset not found error not displaying." |
| 46 | + |
| 47 | +def test_whether_calculator_compute_fails_with_no_dataset(): |
| 48 | + """Test whether the calculator fails to compute SPIs when no dataset is provided.""" |
| 49 | + calc = Calculator() |
| 50 | + with pytest.raises(AttributeError) as excinfo: |
| 51 | + calc.compute() |
| 52 | + assert "Dataset not loaded yet" in str(excinfo.value), "Dataset not loaded yet error not displaying." |
| 53 | + |
| 54 | +def test_calculator_name(): |
| 55 | + """Test whether the calculator name is retrieved correctly.""" |
| 56 | + calc = Calculator(name="test name") |
| 57 | + assert calc.name == "test name", "Calculator name property did not return the expected string 'test name'" |
| 58 | + |
| 59 | +def test_calculator_labels(): |
| 60 | + """Test whether the calculator labels are retreived correctly, when provided.""" |
| 61 | + test_labels = ['label1', 'label2'] |
| 62 | + calc = Calculator(labels = test_labels) |
| 63 | + assert calc.labels == ['label1', 'label2'], f"Calculator labels property did not return the expected list: {test_labels} " |
| 64 | + |
| 65 | +def test_pass_single_integer_as_dataset(): |
| 66 | + """Test whether correct error is thrown when incorrect data type passed into calculator.""" |
| 67 | + with pytest.raises(TypeError) as excinfo: |
| 68 | + calc = Calculator(dataset=42) |
| 69 | + assert "Unknown data type" in str(excinfo.value), "Incorrect data type error not displaying for integer dataset." |
| 70 | + |
| 71 | +def test_pass_incorrect_shape_dataset_into_calculator(): |
| 72 | + """Test whether an error is thrown when incorrect dataset shape is passed into calculator.""" |
| 73 | + dataset_with_wrong_dim = np.random.randn(3, 5, 10) |
| 74 | + with pytest.raises(RuntimeError) as excinfo: |
| 75 | + calc = Calculator(dataset=dataset_with_wrong_dim) |
| 76 | + assert "Data array dimension (3)" in str(excinfo.value), "Incorrect dimension error message not displaying for incorrect shape dataset." |
| 77 | + |
| 78 | +@pytest.mark.parametrize("nan_loc, expected_output", [ |
| 79 | + ([1], "[1]"), |
| 80 | + ([1, 2], "[1 2]"), |
| 81 | + ([0, 2, 3], "[0 2 3]") |
| 82 | + ]) |
| 83 | +def test_pass_dataset_with_nan_into_calculator(nan_loc, expected_output): |
| 84 | + """Check whether ValueError is raised when a dataset containing a NaN is passed into the calculator object""" |
| 85 | + base_dataset = np.random.randn(5, 100) |
| 86 | + for loc in nan_loc: |
| 87 | + base_dataset[loc, 0] = np.NaN |
| 88 | + with pytest.raises(ValueError) as excinfo: |
| 89 | + calc = Calculator(dataset=base_dataset) |
| 90 | + assert f"non-numerics (NaNs) in processes: {expected_output}" in str(excinfo), "NaNs not detected in dataset when loading into Calculator!" |
| 91 | + |
| 92 | +def test_pass_dataset_with_inf_into_calculator(): |
| 93 | + """Check whether ValueError is raised when a dataset containing an inf/-inf value is passed into the calculator object""" |
| 94 | + base_dataset = np.random.randn(5, 100) |
| 95 | + base_dataset[0, 1] = np.inf |
| 96 | + base_dataset[2, 2] = -np.inf |
| 97 | + with pytest.raises(ValueError) as excinfo: |
| 98 | + calc = Calculator(dataset=base_dataset) |
| 99 | + assert f"non-numerics (NaNs) in processes: [0 2]" in str(excinfo), "NaNs not detected in dataset when loading into Calculator!" |
| 100 | + |
| 101 | +@pytest.mark.parametrize("shape, n_procs_expected, n_obs_expected", [ |
| 102 | + ((2, 23), 2, 23), |
| 103 | + ((5, 4), 5, 4), |
| 104 | + ((100, 32), 100, 32) |
| 105 | +]) |
| 106 | +def test_data_object_process_and_observations(shape, n_procs_expected, n_obs_expected): |
| 107 | + """Test whether the number of processes and observations for a given dataset is correct""" |
| 108 | + dat = np.random.randn(shape[0], shape[1]) |
| 109 | + calc = Calculator(dataset=dat) |
| 110 | + assert calc.dataset.n_observations == n_obs_expected, f"Number of observations returned by Calculator ({calc.dataset.n_observations}) does not match exepected: {n_obs_expected}" |
| 111 | + assert calc.dataset.n_processes == n_procs_expected, f"Number of processes returned by Calculator ({calc.dataset.n_processes}) does not match exepected: {n_procs_expected}" |
| 112 | + |
| 113 | +@pytest.mark.parametrize("yaml_filename", [ |
| 114 | + 'fabfour_config', |
| 115 | + 'fast_config', |
| 116 | + 'octaveless_config', |
| 117 | + 'sonnet_config']) |
| 118 | +def test_whether_config_files_exist(yaml_filename): |
| 119 | + """Check whether the config, fabfour, fast, octaveless, sonnet_config files exist""" |
| 120 | + expected_file = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'pyspi', f'{yaml_filename}.yaml')) |
| 121 | + assert os.path.isfile(expected_file), f"{yaml_filename}.yaml file was not found." |
| 122 | + |
| 123 | +@pytest.mark.parametrize("subset, procs, obs", [ |
| 124 | + ("all", 2, 100), |
| 125 | + ("all", 5, 100), |
| 126 | + ("fabfour", 8, 100), |
| 127 | + ("fast", 10, 100), |
| 128 | + ("sonnet", 3, 100) |
| 129 | +]) |
| 130 | +def test_whether_table_shape_correct_before_compute(subset, procs, obs): |
| 131 | + """Test whether the pre-configured table is the correct shape prior to computing SPIs.""" |
| 132 | + dat = np.random.randn(procs, obs) |
| 133 | + calc = Calculator(dataset=dat, subset=subset) |
| 134 | + num_spis = calc.n_spis |
| 135 | + expected_table_shape = (procs, num_spis*procs) |
| 136 | + assert calc.table.shape == expected_table_shape, f"Calculator table ({subset}) shape: ({calc.table.shape}) does not match expected shape: {expected_table_shape}" |
| 137 | + |
| 138 | +############################# Test Data Object ######################## |
| 139 | +def test_data_object_has_been_converted_to_numpyfloat64(): |
| 140 | + """Test whether the data object converts passed dataset to numpy array by default.""" |
| 141 | + dat = np.random.randn(5, 10) |
| 142 | + calc = Calculator(dataset=dat) |
| 143 | + assert calc.dataset.data_type == np.float64, "Dataset was not converted into a numpy array when loaded into Calculator." |
| 144 | + |
| 145 | +def test_whether_data_instantiates(): |
| 146 | + """Test whether the data object instantiates without issue.""" |
| 147 | + data_obj = Data() |
| 148 | + assert isinstance(data_obj, Data), "Data object failed to instantiate!" |
| 149 | + |
| 150 | +def test_whether_data_throws_error_when_retrieving_nonexistent_dataset(): |
| 151 | + """Test whether the data object throws correct message when trying to access a non-existent dataset.""" |
| 152 | + data_obj = Data() |
| 153 | + with pytest.raises(AttributeError) as excinfo: |
| 154 | + dataset = data_obj.data |
| 155 | + assert "'Data' object has no attribute 'data'" in str(excinfo), "Unexpected error message when trying to retrieve non-existent dataset!" |
| 156 | + |
| 157 | +def test_whether_data_throws_error_when_incorrect_dataset_type(): |
| 158 | + """Test if correct message is shown when passing invalid dataset data type into data object.""" |
| 159 | + with pytest.raises(TypeError) as excinfo: |
| 160 | + d = Data(data=3) |
| 161 | + assert f"Unknown data type" in str(excinfo), "Incorrect error message thrown when invalid dataset loaded into data object." |
| 162 | + |
| 163 | +@pytest.mark.parametrize("order, shape, n_procs_expected, n_obs_expected", [ |
| 164 | + ("ps", (3, 100), 3, 100), |
| 165 | + ("sp", (100, 3), 3, 100) |
| 166 | +]) |
| 167 | +def test_whether_dim_order_works(order, shape, n_procs_expected, n_obs_expected): |
| 168 | + """Check that ps and sp correctly specify order of process/obseravtions""" |
| 169 | + dataset = np.random.randn(shape[0], shape[1]) |
| 170 | + d = Data(data=dataset, dim_order=order) |
| 171 | + assert d.n_processes == n_procs_expected, f"Number of processes does not match expected for specified dim order: {order}" |
| 172 | + assert d.n_observations == n_obs_expected, f"Number of observations does not match expected for specified dim order: {order}" |
| 173 | + |
| 174 | +def test_whether_data_name_assigned_only_with_dataset(): |
| 175 | + """If no dataset is provided, there is no name for the data object (N/A)""" |
| 176 | + d = Data(name='test') |
| 177 | + assert d.name == 'N/A', "Data object name is not N/A when no dataset provided." |
| 178 | + |
| 179 | +def test_whether_data_object_has_name_with_dataset(): |
| 180 | + """If dataset is provided, the name will be returned""" |
| 181 | + dataset = np.random.randn(4, 100) |
| 182 | + d = Data(data=dataset, name='test') |
| 183 | + assert d.name == "test", f"Data object name 'test' is not being returned. Instead, {d.name} is returned." |
| 184 | + |
| 185 | +def test_whether_data_normalise_works(): |
| 186 | + """Check whether the data is being normalised by default when loading into data object""" |
| 187 | + dataset = 4 * np.random.randn(10, 500) |
| 188 | + d = Data(data=dataset, normalise=True) |
| 189 | + returned_dataset = d.to_numpy(squeeze=True) |
| 190 | + assert returned_dataset.mean() == pytest.approx(0, 1e-8), f"Returned dataset mean is not close to zero: {returned_dataset.mean()}" |
| 191 | + assert returned_dataset.std() == pytest.approx(1, 0.01), f"Returned dataset std is not close to one: {returned_dataset.std()}" |
| 192 | + |
| 193 | +def test_whether_set_data_works(): |
| 194 | + """Check whether existing dataset is overwritten by new dataset""" |
| 195 | + old_dataset = np.random.randn(1, 100) |
| 196 | + d = Data(data=old_dataset) # start with empty data object |
| 197 | + new_dataset = np.random.randn(5, 100) |
| 198 | + d.set_data(data=new_dataset) |
| 199 | + # just check the shapes since new datast will be normalised and not equal to the dataset passed in |
| 200 | + assert d.to_numpy(squeeze=True).shape[0] == 5, "Unexpected dataset returned when overwriting existing dataset!" |
| 201 | + |
| 202 | +def test_add_univariate_process_to_existing_data_object(): |
| 203 | + # start with initial data object |
| 204 | + dataset = np.random.randn(5, 100) |
| 205 | + orig_data_object = Data(data=dataset) |
| 206 | + # now add additional proc to existing data object |
| 207 | + new_univariate_proc = np.random.randn(1, 100) |
| 208 | + orig_data_object.add_process(proc=new_univariate_proc) |
| 209 | + assert orig_data_object.n_processes == 6, "New dataset number of processes not equal to expected number of processes." |
| 210 | + |
| 211 | +def test_add_multivariate_process_to_existing_data_object(): |
| 212 | + """Should not work, can only add univariate process with add_process function""" |
| 213 | + dataset = np.random.randn(5, 100) |
| 214 | + orig_data_object = Data(data=dataset) |
| 215 | + # now add additional procs to existing data object |
| 216 | + new_multivariate_proc = np.random.randn(2, 100) |
| 217 | + with pytest.raises(TypeError) as excinfo: |
| 218 | + orig_data_object.add_process(proc=new_multivariate_proc) |
| 219 | + assert "Process must be a 1D numpy array" in str(excinfo.value), "Expected 1D array error NOT thrown." |
| 220 | + |
| 221 | +@pytest.mark.parametrize("index", |
| 222 | + [[1], [1, 3], [1, 2, 3]]) |
| 223 | +def test_remove_valid_process_from_existing_dataset(index): |
| 224 | + """Try to remove valid processes from existing dataset by specifying one or more indices. |
| 225 | + Check if correct indices are being used.""" |
| 226 | + dataset = np.random.randn(5, 100) |
| 227 | + d = Data(data=dataset, normalise=False) |
| 228 | + rows_to_remove = index |
| 229 | + expected_dataset = np.delete(dataset, rows_to_remove, axis=0) |
| 230 | + d.remove_process(index) |
| 231 | + out = d.to_numpy(squeeze=True) |
| 232 | + assert out.shape[0] == (5 - len(index)), f"Dataset shape after removing {len(index)} proc(s) not equal to {(5 - len(index))}" |
| 233 | + assert np.array_equal(expected_dataset, out), f"Expected dataset after removing proc(s): {index} not equal to dataset returned." |
| 234 | + |
| 235 | +@pytest.mark.parametrize("dataset_name", ["forex", "cml"]) |
| 236 | +def test_load_valid_dataset(dataset_name): |
| 237 | + """Test whether the load_dataset function will load all available datasets.""" |
| 238 | + dataset = load_dataset(dataset_name) |
| 239 | + assert isinstance(dataset, Data), f"Could not load dataset: {dataset_name}" |
| 240 | + |
| 241 | +def test_load_invalid_dataset(): |
| 242 | + """Test whether the load_dataset function throws the correct error/message when trying to load non-existent dataset.""" |
| 243 | + with pytest.raises(NameError) as excinfo: |
| 244 | + dataset = load_dataset(name="test") |
| 245 | + assert "Unknown dataset: test" in str(excinfo.value), "Did not get expected error when loading invalid dataset." |
0 commit comments