-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathEvaluateModel.py
129 lines (100 loc) · 4.35 KB
/
EvaluateModel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import sys
sys.path.append("..")
import os
import argparse
import json
import numpy as np
import random
import tempfile
from envs.utils.EnvBuilder import EnvBuilder
from stable_baselines3 import PPO
from tqdm import tqdm
from tabulate import tabulate
class NoStdStreams(object):
def __init__(self,stdout = None, stderr = None):
self.devnull = open(os.devnull,'w')
self._stdout = stdout or self.devnull or sys.stdout
self._stderr = stderr or self.devnull or sys.stderr
def __enter__(self):
self.old_stdout, self.old_stderr = sys.stdout, sys.stderr
self.old_stdout.flush(); self.old_stderr.flush()
sys.stdout, sys.stderr = self._stdout, self._stderr
def __exit__(self, exc_type, exc_value, traceback):
self._stdout.flush(); self._stderr.flush()
sys.stdout = self.old_stdout
sys.stderr = self.old_stderr
self.devnull.close()
def evaluate(mu, sigma, denoiser, modelPath, trials, gui=False, fixed=False):
seeds = np.load('evalSeeds.npy').tolist()
envFile = {
'none': f'../configs/NoDenoiserEnv{"Fixed" if fixed else ""}.json',
'lpf': f'../configs/LPFDenoiserEnv{"Fixed" if fixed else ""}.json',
'kf': f'../configs/KFDenoiserEnv{"Fixed" if fixed else ""}.json'
}[denoiser]
with open(envFile, 'r') as f:
envConfig = json.load(f)
envConfig['noiseParameters']['mu'] = mu
envConfig['noiseParameters']['sigma'] = sigma
with open('tempConfigFile.json', 'w') as f:
json.dump(envConfig, f)
with NoStdStreams():
env = EnvBuilder.buildEnvFromConfig('tempConfigFile.json', gui=gui)
print("Evaluating Model on")
print(env)
os.remove('tempConfigFile.json')
agent = PPO.load(modelPath)
totalTrials = trials
successfulTrials = 0
rewards = []
durations = []
nCollisions = 0
incompleteDistances = []
for i in range(totalTrials):
np.random.seed(seeds[i])
random.seed(seeds[i])
done = False
episodeReward = 0
episodeDuration = 0
distToTarget = []
obs = env.reset()
while not done:
episodeDuration += 1
action, _ = agent.predict(obs, deterministic=True)
obs, reward, done, info = env.step(action)
episodeReward += reward
distToTarget.append(np.linalg.norm(obs[:(obs.shape[0]//2)]))
if info['success']:
successfulTrials += 1
elif info['reason'] == "collision":
nCollisions +=1
else:
incompleteDistances.append(info['dist'])
rewards.append(episodeReward)
durations.append(episodeDuration)
print(f"Trial {i+1}/{totalTrials}. Current Success Rate: {(successfulTrials/(i+1))*100:.2f}% ", end="\r", flush=True, file=sys.stderr)
env.close()
print()
evaluationResults = {
'Success Rate': f"{successfulTrials/totalTrials * 100:.2f}%",
'Collision Rate': f"{nCollisions/totalTrials * 100:.2f}%",
'Mean Incompletion Distance': f"{sum(incompleteDistances)/len(incompleteDistances):.2f}m" if len(incompleteDistances) > 0 else "N/A",
'Mean Reward': f"{sum(rewards)/len(rewards):.2f}",
'Mean Episode Length': f"{sum(durations)/len(durations)}"
}
evaluationTable = [[k, v] for k,v in evaluationResults.items()]
return evaluationTable
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("modelPath", help="Path to the Model", type=str)
parser.add_argument("mu", help="Mean of the Noise", type=float)
parser.add_argument("sigma", help="Standard Deviation of the Noise", type=float)
parser.add_argument("denoiser", help="Denoiser to Use", choices={'none', 'lpf', 'kf'}, type=str)
parser.add_argument("-t", "--trials", type=int, default=10, help="Number of episodes to evaluate for.")
parser.add_argument('--gui', action='store_true', help='Enable GUI')
parser.add_argument('--no-gui', action='store_false', dest='gui', help='Disable GUI')
parser.add_argument('--fixed', action='store_true', help='Use Fixed Obstacles')
parser.add_argument('--random', action='store_false', dest='fixed', help='Use Randomized Obstacles')
args = parser.parse_args()
evaluationTable = evaluate(**vars(args))
print()
print(tabulate(evaluationTable, headers=["Metric", "Value"], tablefmt='github'))