Skip to content

Commit a1dec9c

Browse files
authored
Merge pull request #4 from fidelity/parallel
Benchmark Parallelization
2 parents 98179a9 + 6a39bad commit a1dec9c

File tree

5 files changed

+253
-31
lines changed

5 files changed

+253
-31
lines changed

CHANGELOG.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22
CHANGELOG
33
=========
44

5+
-------------------------------------------------------------------------------
6+
June, 16, 2021 1.1.0
7+
-------------------------------------------------------------------------------
8+
9+
- Parallelize benchmark function.
10+
511
-------------------------------------------------------------------------------
612
March, 23, 2021 1.0.1
713
-------------------------------------------------------------------------------

feature/_version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22
# Copyright FMR LLC <[email protected]>
33
# SPDX-License-Identifier: GNU GPLv3
44

5-
__version__ = "1.0.1"
5+
__version__ = "1.1.0"

feature/selector.py

Lines changed: 85 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,20 @@
55

66
"""
77
:Author: FMR LLC
8-
:Version: 1.0.0 of August 10, 2020
8+
:Version: 1.1.0 of June 16, 2021
99
1010
This module defines the public interface of the **Selective Library** for feature selection.
1111
"""
1212

13+
import multiprocessing as mp
1314
from time import time
1415
from typing import Dict, Union, NamedTuple, NoReturn, Tuple, Optional
1516

1617
import numpy as np
1718
import pandas as pd
1819
import seaborn as sns
1920
from catboost import CatBoostClassifier, CatBoostRegressor
21+
from joblib import Parallel, delayed
2022
from lightgbm import LGBMClassifier, LGBMRegressor
2123
from sklearn.ensemble import AdaBoostClassifier, AdaBoostRegressor
2224
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
@@ -439,7 +441,7 @@ def _validate_args(seed, selection_method) -> NoReturn:
439441
SelectionMethod.TreeBased,
440442
SelectionMethod.Statistical,
441443
SelectionMethod.Variance)),
442-
TypeError("Unknown selection type: " + str(selection_method)))
444+
TypeError("Unknown selection type: " + str(selection_method) + " " + str(type(selection_method))))
443445

444446
# Selection method value
445447
selection_method._validate()
@@ -480,6 +482,7 @@ def benchmark(selectors: Dict[str, Union[SelectionMethod.Correlation,
480482
output_filename: Optional[str] = None,
481483
drop_zero_variance_features: Optional[bool] = True,
482484
verbose: bool = False,
485+
n_jobs: int = 1,
483486
seed: int = Constants.default_seed) \
484487
-> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
485488
"""
@@ -507,6 +510,10 @@ def benchmark(selectors: Dict[str, Union[SelectionMethod.Correlation,
507510
Whether to drop features with zero variance before running feature selector methods or not.
508511
verbose: bool, optional (default=False)
509512
Whether to print progress messages or not.
513+
n_jobs: int, optional (default=1)
514+
Number of concurrent processes/threads to use in parallelized routines.
515+
If set to -1, all CPUs are used.
516+
If set to -2, all CPUs but one are used, and so on.
510517
seed: int, optional (default=Constants.default_seed)
511518
The random seed to initialize the random number generator.
512519
@@ -525,7 +532,8 @@ def benchmark(selectors: Dict[str, Union[SelectionMethod.Correlation,
525532
labels=labels,
526533
output_filename=output_filename,
527534
drop_zero_variance_features=drop_zero_variance_features,
528-
verbose=verbose)
535+
verbose=verbose,
536+
n_jobs=n_jobs)
529537
else:
530538

531539
# Create K-Fold object
@@ -555,7 +563,8 @@ def benchmark(selectors: Dict[str, Union[SelectionMethod.Correlation,
555563
labels=train_labels,
556564
output_filename=output_filename,
557565
drop_zero_variance_features=drop_zero_variance_features,
558-
verbose=False)
566+
verbose=False,
567+
n_jobs=n_jobs)
559568

560569
# Concatenate data frames
561570
score_df = pd.concat((score_df, score_cv_df))
@@ -577,7 +586,8 @@ def _bench(selectors: Dict[str, Union[SelectionMethod.Correlation,
577586
labels: Optional[pd.Series] = None,
578587
output_filename: Optional[str] = None,
579588
drop_zero_variance_features: Optional[bool] = True,
580-
verbose: bool = False) \
589+
verbose: bool = False,
590+
n_jobs: int = 1) \
581591
-> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
582592
"""
583593
Benchmark with a given set of feature selectors.
@@ -591,7 +601,7 @@ def _bench(selectors: Dict[str, Union[SelectionMethod.Correlation,
591601
check_true(selectors is not None, ValueError("Benchmark selectors cannot be none."))
592602
check_true(data is not None, ValueError("Benchmark data cannot be none."))
593603

594-
# Output files
604+
# Output file
595605
if output_filename is not None:
596606
output_file = open(output_filename, "a")
597607
else:
@@ -605,39 +615,84 @@ def _bench(selectors: Dict[str, Union[SelectionMethod.Correlation,
605615
method_to_runtime = {}
606616
score_df = pd.DataFrame(index=data.columns)
607617
selected_df = pd.DataFrame(index=data.columns)
608-
for method_name, method in selectors.items():
609-
selector = Selective(method)
610-
t0 = time()
611-
if verbose:
612-
print("\n>>> Running", method_name)
613-
scores = None
614-
selected = []
615-
try:
616-
subset = selector.fit_transform(data, labels)
617-
scores = selector.get_absolute_scores()
618-
selected = [1 if c in subset.columns else 0 for c in data.columns]
619-
method_to_runtime[method_name] = round((time() - t0) / 60, 2)
620-
except Exception as exp:
621-
print("Exception", exp)
622-
scores = np.repeat(0, len(data.columns))
623-
selected = np.repeat(0, len(data.columns))
624-
method_to_runtime[method_name] = str(round((time() - t0) / 60, 2)) + " (exception)"
625-
finally:
626-
score_df[method_name] = scores
627-
selected_df[method_name] = selected
618+
619+
# Find the effective number of jobs
620+
size = len(selectors.items())
621+
if n_jobs < 0:
622+
n_jobs = max(mp.cpu_count() + 1 + n_jobs, 1)
623+
n_jobs = min(n_jobs, size)
624+
625+
# Parallel benchmarks for each method
626+
output_list = Parallel(n_jobs=n_jobs, require="sharedmem")(
627+
delayed(_parallel_bench)(
628+
data, labels, method_name, method, verbose)
629+
for method_name, method in selectors.items())
630+
631+
# Collect the output from each method
632+
for output in output_list:
633+
for method_name, results_dict in output.items():
634+
score_df[method_name] = results_dict["scores"]
635+
selected_df[method_name] = results_dict["selected"]
636+
method_to_runtime[method_name] = results_dict["runtime"]
637+
628638
if output_filename is not None:
629639
output_file.write(method_name + " " + str(method_to_runtime[method_name]) + "\n")
630-
output_file.write(str(selected) + "\n")
631-
output_file.write(str(scores) + "\n")
632-
if verbose:
633-
print(f"<<< Done! Time taken: {(time() - t0) / 60:.2f} minutes")
640+
output_file.write(str(results_dict["selected"]) + "\n")
641+
output_file.write(str(results_dict["scores"]) + "\n")
634642

635643
# Format
636644
runtime_df = pd.Series(method_to_runtime).to_frame("runtime").rename_axis("method").reset_index()
637645

638646
return score_df, selected_df, runtime_df
639647

640648

649+
def _parallel_bench(data: pd.DataFrame,
650+
labels: Optional[pd.Series],
651+
method_name: str,
652+
method: Union[SelectionMethod.Correlation,
653+
SelectionMethod.Linear,
654+
SelectionMethod.TreeBased,
655+
SelectionMethod.Statistical,
656+
SelectionMethod.Variance],
657+
verbose: bool) \
658+
-> Dict[str, Dict[str, Union[pd.DataFrame, list, float]]]:
659+
"""
660+
Benchmark with a given set of feature selectors.
661+
Return a dictionary of feature selection method names with their corresponding scores,
662+
selected features and runtime.
663+
664+
Returns
665+
-------
666+
Dictionary of feature selection method names with their corresponding scores, selected features
667+
and runtime.
668+
"""
669+
670+
selector = Selective(method)
671+
t0 = time()
672+
if verbose:
673+
run_str = "\n>>> Running " + method_name
674+
print(run_str, flush=True)
675+
676+
try:
677+
subset = selector.fit_transform(data, labels)
678+
scores = selector.get_absolute_scores()
679+
selected = [1 if c in subset.columns else 0 for c in data.columns]
680+
runtime = round((time() - t0) / 60, 2)
681+
except Exception as exp:
682+
print("Exception", exp)
683+
scores = np.repeat(0, len(data.columns))
684+
selected = np.repeat(0, len(data.columns))
685+
runtime = str(round((time() - t0) / 60, 2)) + " (exception)"
686+
finally:
687+
if verbose:
688+
done_str = f"<<< Done! {method_name} Time taken: {(time() - t0) / 60:.2f} minutes"
689+
print(done_str, flush=True)
690+
691+
results_dict = {"scores": scores, "selected": selected, "runtime": runtime}
692+
693+
return {method_name: results_dict}
694+
695+
641696
def calculate_statistics(scores: pd.DataFrame,
642697
selected: pd.DataFrame,
643698
columns: Optional[list] = None,

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
catboost
2+
joblib
23
lightgbm
34
minepy
45
numpy

tests/test_parallel.py

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright FMR LLC <[email protected]>
3+
# SPDX-License-Identifier: GNU GPLv3
4+
5+
from catboost import CatBoostClassifier, CatBoostRegressor
6+
from lightgbm import LGBMClassifier, LGBMRegressor
7+
from sklearn.datasets import load_boston, load_iris
8+
from sklearn.ensemble import AdaBoostClassifier, AdaBoostRegressor
9+
from sklearn.ensemble import ExtraTreesClassifier, ExtraTreesRegressor
10+
from sklearn.ensemble import GradientBoostingClassifier, GradientBoostingRegressor
11+
from xgboost import XGBClassifier, XGBRegressor
12+
13+
from feature.utils import get_data_label
14+
from feature.selector import SelectionMethod, benchmark
15+
from tests.test_base import BaseTest
16+
17+
18+
class TestParallel(BaseTest):
19+
20+
num_features = 3
21+
corr_threshold = 0.5
22+
alpha = 1000
23+
tree_params = {"random_state": 123, "n_estimators": 100}
24+
25+
selectors = {
26+
"corr_pearson": SelectionMethod.Correlation(corr_threshold, method="pearson"),
27+
"corr_kendall": SelectionMethod.Correlation(corr_threshold, method="kendall"),
28+
"corr_spearman": SelectionMethod.Correlation(corr_threshold, method="spearman"),
29+
"univ_anova": SelectionMethod.Statistical(num_features, method="anova"),
30+
"univ_chi_square": SelectionMethod.Statistical(num_features, method="chi_square"),
31+
"univ_mutual_info": SelectionMethod.Statistical(num_features, method="mutual_info"),
32+
"linear": SelectionMethod.Linear(num_features, regularization="none"),
33+
"lasso": SelectionMethod.Linear(num_features, regularization="lasso", alpha=alpha),
34+
"ridge": SelectionMethod.Linear(num_features, regularization="ridge", alpha=alpha),
35+
"random_forest": SelectionMethod.TreeBased(num_features),
36+
"xgboost_clf": SelectionMethod.TreeBased(num_features, estimator=XGBClassifier(**tree_params)),
37+
"xgboost_reg": SelectionMethod.TreeBased(num_features, estimator=XGBRegressor(**tree_params)),
38+
"extra_clf": SelectionMethod.TreeBased(num_features, estimator=ExtraTreesClassifier(**tree_params)),
39+
"extra_reg": SelectionMethod.TreeBased(num_features, estimator=ExtraTreesRegressor(**tree_params)),
40+
"lgbm_clf": SelectionMethod.TreeBased(num_features, estimator=LGBMClassifier(**tree_params)),
41+
"lgbm_reg": SelectionMethod.TreeBased(num_features, estimator=LGBMRegressor(**tree_params)),
42+
"gradient_clf": SelectionMethod.TreeBased(num_features, estimator=GradientBoostingClassifier(**tree_params)),
43+
"gradient_reg": SelectionMethod.TreeBased(num_features, estimator=GradientBoostingRegressor(**tree_params)),
44+
"adaboost_clf": SelectionMethod.TreeBased(num_features, estimator=AdaBoostClassifier(**tree_params)),
45+
"adaboost_reg": SelectionMethod.TreeBased(num_features, estimator=AdaBoostRegressor(**tree_params)),
46+
"catboost_clf": SelectionMethod.TreeBased(num_features, estimator=CatBoostClassifier(**tree_params, silent=True)),
47+
"catboost_reg": SelectionMethod.TreeBased(num_features, estimator=CatBoostRegressor(**tree_params, silent=True))
48+
}
49+
50+
def test_benchmark_regression(self):
51+
data, label = get_data_label(load_boston())
52+
data = data.drop(columns=["CHAS", "NOX", "RM", "DIS", "RAD", "TAX", "PTRATIO", "INDUS"])
53+
54+
# Benchmark
55+
score_df_sequential, selected_df_sequential, runtime_df_sequential = benchmark(self.selectors, data, label)
56+
score_df_p1, selected_df_p1, runtime_df_p1 = benchmark(self.selectors, data, label, verbose=True, n_jobs=1)
57+
score_df_p2, selected_df_p2, runtime_df_p2 = benchmark(self.selectors, data, label, verbose=True, n_jobs=2)
58+
59+
# Scores
60+
self.assertListAlmostEqual([0.069011, 0.054086, 0.061452, 0.006510, 0.954662],
61+
score_df_sequential["linear"].to_list())
62+
self.assertListAlmostEqual([0.056827, 0.051008, 0.053192, 0.007176, 0.923121],
63+
score_df_sequential["lasso"].to_list())
64+
65+
self.assertListAlmostEqual(score_df_sequential["linear"].to_list(), score_df_p1["linear"].to_list())
66+
self.assertListAlmostEqual(score_df_sequential["linear"].to_list(), score_df_p2["linear"].to_list())
67+
self.assertListAlmostEqual(score_df_sequential["lasso"].to_list(), score_df_p1["lasso"].to_list())
68+
self.assertListAlmostEqual(score_df_sequential["lasso"].to_list(), score_df_p2["lasso"].to_list())
69+
70+
# Selected
71+
self.assertListEqual([1, 0, 1, 0, 1], selected_df_sequential["linear"].to_list())
72+
self.assertListEqual([1, 0, 1, 0, 1], selected_df_sequential["lasso"].to_list())
73+
74+
self.assertListEqual(selected_df_sequential["linear"].to_list(), selected_df_p1["linear"].to_list())
75+
self.assertListEqual(selected_df_sequential["linear"].to_list(), selected_df_p2["linear"].to_list())
76+
self.assertListEqual(selected_df_sequential["lasso"].to_list(), selected_df_p1["lasso"].to_list())
77+
self.assertListEqual(selected_df_sequential["lasso"].to_list(), selected_df_p2["lasso"].to_list())
78+
79+
def test_benchmark_classification(self):
80+
data, label = get_data_label(load_iris())
81+
82+
# Benchmark
83+
score_df_sequential, selected_df_sequential, runtime_df_sequential = benchmark(self.selectors, data, label)
84+
score_df_p1, selected_df_p1, runtime_df_p1 = benchmark(self.selectors, data, label, n_jobs=1)
85+
score_df_p2, selected_df_p2, runtime_df_p2 = benchmark(self.selectors, data, label, n_jobs=2)
86+
87+
# Scores
88+
self.assertListAlmostEqual([0.289930, 0.560744, 0.262251, 0.042721],
89+
score_df_sequential["linear"].to_list())
90+
self.assertListAlmostEqual([0.764816, 0.593482, 0.365352, 1.015095],
91+
score_df_sequential["lasso"].to_list())
92+
93+
self.assertListAlmostEqual(score_df_sequential["linear"].to_list(), score_df_p1["linear"].to_list())
94+
self.assertListAlmostEqual(score_df_sequential["linear"].to_list(), score_df_p2["linear"].to_list())
95+
self.assertListAlmostEqual(score_df_sequential["lasso"].to_list(), score_df_p1["lasso"].to_list())
96+
self.assertListAlmostEqual(score_df_sequential["lasso"].to_list(), score_df_p2["lasso"].to_list())
97+
98+
# Selected
99+
self.assertListEqual([1, 1, 1, 0], selected_df_sequential["linear"].to_list())
100+
self.assertListEqual([1, 1, 0, 1], selected_df_sequential["lasso"].to_list())
101+
102+
self.assertListEqual(selected_df_sequential["linear"].to_list(), selected_df_p1["linear"].to_list())
103+
self.assertListEqual(selected_df_sequential["linear"].to_list(), selected_df_p2["linear"].to_list())
104+
self.assertListEqual(selected_df_sequential["lasso"].to_list(), selected_df_p1["lasso"].to_list())
105+
self.assertListEqual(selected_df_sequential["lasso"].to_list(), selected_df_p2["lasso"].to_list())
106+
107+
def test_benchmark_regression_cv(self):
108+
data, label = get_data_label(load_boston())
109+
data = data.drop(columns=["CHAS", "NOX", "RM", "DIS", "RAD", "TAX", "PTRATIO", "INDUS"])
110+
111+
# Benchmark
112+
score_df_sequential, selected_df_sequential, runtime_df_sequential = benchmark(self.selectors, data, label,
113+
cv=5, output_filename=None)
114+
score_df_p1, selected_df_p1, runtime_df_p1 = benchmark(self.selectors, data, label, cv=5,
115+
output_filename=None, n_jobs=1)
116+
score_df_p2, selected_df_p2, runtime_df_p2 = benchmark(self.selectors, data, label, cv=5,
117+
output_filename=None, n_jobs=2)
118+
119+
# Aggregate scores from different cv-folds
120+
score_df_sequential = score_df_sequential.groupby(score_df_sequential.index).mean()
121+
score_df_p1 = score_df_p1.groupby(score_df_p1.index).mean()
122+
score_df_p2 = score_df_p2.groupby(score_df_p2.index).mean()
123+
124+
# Scores
125+
self.assertListAlmostEqual([0.061577, 0.006446, 0.066933, 0.957603, 0.053797],
126+
score_df_sequential["linear"].to_list())
127+
self.assertListAlmostEqual([0.053294, 0.007117, 0.054563, 0.926039, 0.050716],
128+
score_df_sequential["lasso"].to_list())
129+
130+
self.assertListAlmostEqual(score_df_sequential["linear"].to_list(), score_df_p1["linear"].to_list())
131+
self.assertListAlmostEqual(score_df_sequential["linear"].to_list(), score_df_p2["linear"].to_list())
132+
self.assertListAlmostEqual(score_df_sequential["lasso"].to_list(), score_df_p1["lasso"].to_list())
133+
self.assertListAlmostEqual(score_df_sequential["lasso"].to_list(), score_df_p2["lasso"].to_list())
134+
135+
def test_benchmark_classification_cv(self):
136+
data, label = get_data_label(load_iris())
137+
138+
# Benchmark
139+
score_df_sequential, selected_df_sequential, runtime_df_sequential = benchmark(self.selectors, data, label,
140+
cv=5, output_filename=None)
141+
score_df_p1, selected_df_p1, runtime_df_p1 = benchmark(self.selectors, data, label, cv=5,
142+
output_filename=None, n_jobs=1)
143+
score_df_p2, selected_df_p2, runtime_df_p2 = benchmark(self.selectors, data, label, cv=5,
144+
output_filename=None, n_jobs=2)
145+
146+
# Aggregate scores from different cv-folds
147+
score_df_sequential = score_df_sequential.groupby(score_df_sequential.index).mean()
148+
score_df_p1 = score_df_p1.groupby(score_df_p1.index).mean()
149+
score_df_p2 = score_df_p2.groupby(score_df_p2.index).mean()
150+
151+
# Scores
152+
self.assertListAlmostEqual([0.223276, 0.035431, 0.262547, 0.506591],
153+
score_df_sequential["linear"].to_list())
154+
self.assertListAlmostEqual([0.280393, 0.948935, 0.662777, 0.476188],
155+
score_df_sequential["lasso"].to_list())
156+
157+
self.assertListAlmostEqual(score_df_sequential["linear"].to_list(), score_df_p1["linear"].to_list())
158+
self.assertListAlmostEqual(score_df_sequential["linear"].to_list(), score_df_p2["linear"].to_list())
159+
self.assertListAlmostEqual(score_df_sequential["lasso"].to_list(), score_df_p1["lasso"].to_list())
160+
self.assertListAlmostEqual(score_df_sequential["lasso"].to_list(), score_df_p2["lasso"].to_list())

0 commit comments

Comments
 (0)