Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add eval functions #124

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions makefiles/Pypi.mk
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ version-publish:

version:
poetry version patch

build:
rm -f dist/*
rm -rf src/*.egg-info
poetry build

publish:
poetry publish
Empty file added src/dg_commons/eval/__init__.py
Empty file.
60 changes: 60 additions & 0 deletions src/dg_commons/eval/comfort.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from __future__ import annotations

import numpy as np
from dg_commons import U, DgSampledSequence, iterate_with_dt
from dg_commons.seq.sequence import Timestamp


def get_max_jerk(commands: DgSampledSequence[U], t_range: tuple[Timestamp | None, Timestamp | None] = (None, None)):
"""
Get the maximum jerk of the planned action sequence.
Only timesteps within t_range are considered.
"""
max_jerk = 0
for it in iterate_with_dt(commands):
if t_range[0] is not None and it.t1 < t_range[0]:
continue
if t_range[1] is not None and it.t1 > t_range[1]:
break
jerk = (it.v1.acc - it.v0.acc) / float(it.t1 - it.t0)
if np.abs(jerk) > max_jerk:
max_jerk = np.abs(jerk)
return max_jerk


def get_acc_rms(commands: DgSampledSequence[U], t_range: tuple[Timestamp | None, Timestamp | None] = (None, None)):
"""
comfort measurement according to ISO 2631. returns the rms of frequency weighted acceletation
Only timesteps within t_range are considered.
"""
t_range_min = commands.timestamps[0] if t_range[0] is None else t_range[0]
t_range_max = commands.timestamps[-1] if t_range[1] is None else t_range[1]
acc_time = np.array(
[
command.acc
for command, timestep in zip(commands.values, commands.timestamps)
if timestep >= t_range_min and timestep <= t_range_max
]
)
st = 0.1
acc_freqs = np.fft.rfft(acc_time)
freqs = np.fft.rfftfreq(n=len(acc_time), d=st)
acc_freq_weighted = [acc_freq * acc_freq_filter(freq) for acc_freq, freq in zip(acc_freqs, freqs)]
acc_freq_weighted = np.array(acc_freq_weighted)
acc_time_weighted = np.fft.irfft(acc_freq_weighted)
acc_rms = 0
for acc in acc_time_weighted:
acc_rms += np.square(acc)
acc_rms = np.sqrt(acc_rms / len(acc_time_weighted))
return acc_rms


def acc_freq_filter(freq: float) -> float:
"""
reference:
Low order continuous-time filters for approximation of the ISO 2631-1 human vibration sensitivity weightings
:param freq:
:return: 3rd-order approximated weight for horizonal acceleration
"""
w = (14.55 * freq**2 + 6.026 * freq + 7.725) / (freq**3 + 15.02 * freq**2 + 51.63 * freq + 47.61)
return w
127 changes: 127 additions & 0 deletions src/dg_commons/eval/efficiency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
from __future__ import annotations

from math import sqrt
from typing import Optional

import numpy as np
from commonroad.scenario.lanelet import Lanelet, LaneletNetwork

from dg_commons import X, U, DgSampledSequence, iterate_with_dt
from dg_commons.geo import PoseState, SE2Transform
from dg_commons.maps.lanes import DgLanelet
from dg_commons.seq.sequence import Timestamp
from dg_commons.sim.goals import RefLaneGoal


def distance_traveled(states: DgSampledSequence[X]) -> float:
dist: float = 0
for it in iterate_with_dt(states):
dist += sqrt((it.v1.x - it.v0.x) ** 2 + (it.v1.y - it.v0.y) ** 2)
return dist


def actuation_effort(commands: DgSampledSequence[U]) -> float:
pass


def time_goal_lane_reached(
lanelet_network: LaneletNetwork,
goal_lane: RefLaneGoal,
states: DgSampledSequence[X],
pos_tol: float = 0.8,
heading_tol: float = 0.08,
) -> Timestamp | None:
reached_time = None
for idx, state in enumerate(states.values):
reached = desired_lane_reached(lanelet_network, goal_lane, state, pos_tol, heading_tol)
if reached:
reached_time = states.timestamps[idx]
break
return reached_time


def desired_lane_reached(
lanelet_network: LaneletNetwork, goal_lane: RefLaneGoal, state: X, pos_tol: float, heading_tol: float
) -> bool:
"""

:param state: the last ego state in the simulation
:param goal_lane: the desired lane
:return: True if the ego vehicle has reached the goal lane or any of its successors. Reached means the vehicle
center is close to the lane center and the heading is aligned with the lane.
"""
ego_posestate = PoseState(x=state.x, y=state.y, psi=state.psi)
ego_pose = SE2Transform.from_PoseState(ego_posestate)
ref_lane = goal_lane.ref_lane # dglanelet
lane_pose = ref_lane.lane_pose_from_SE2Transform(ego_pose)
while not lane_pose.along_inside:
if np.abs(lane_pose.along_lane) < 1.0 or np.abs(lane_pose.along_lane - ref_lane.get_lane_length()) < 1.0:
# not inside the lane but close enough
break
if lane_pose.along_after:
ref_lane = get_successor_dglane(lanelet_network, ref_lane)
if ref_lane is not None:
lane_pose = ref_lane.lane_pose_from_SE2Transform(ego_pose)
else:
break
if lane_pose.along_before:
ref_lane = get_predecessor_dglane(lanelet_network, ref_lane)
if ref_lane is not None:
lane_pose = ref_lane.lane_pose_from_SE2Transform(ego_pose)
else:
break

if goal_lane.is_fulfilled(state):
return True
# vehicle still on the road and is inside the desired lane, check its pose
if (
lane_pose.lateral_inside
and lane_pose.distance_from_center < pos_tol
and abs(lane_pose.relative_heading) < heading_tol
):
return True
else:
return False


def get_lanelet_from_dglanelet(lanelet_network: LaneletNetwork, dglanelet: DgLanelet) -> Lanelet:
ref_point = dglanelet.control_points[1].q.p
lane_id = lanelet_network.find_lanelet_by_position([ref_point])[0][0]
lanelet = lanelet_network.find_lanelet_by_id(lane_id)
return lanelet


def get_successor_lane(lanelet_network: LaneletNetwork, cur_lane: Lanelet | DgLanelet) -> Optional[Lanelet]:
# note: only one successor is considered for now
if isinstance(cur_lane, DgLanelet):
cur_lane = get_lanelet_from_dglanelet(lanelet_network, cur_lane)
if len(cur_lane.successor) > 0:
return lanelet_network.find_lanelet_by_id(cur_lane.successor[0])
else:
return None


def get_successor_dglane(lanelet_network: LaneletNetwork, cur_lane: Lanelet | DgLanelet) -> Optional[DgLanelet]:
suc_lanelet = get_successor_lane(lanelet_network, cur_lane)
if suc_lanelet is None:
return None
else:
return DgLanelet.from_commonroad_lanelet(suc_lanelet)


def get_predecessor_lane(lanelet_network: LaneletNetwork, cur_lane: Lanelet | DgLanelet) -> Optional[Lanelet]:
# note: only one predecessor is considered for now
if isinstance(cur_lane, DgLanelet):
cur_lane = get_lanelet_from_dglanelet(lanelet_network, cur_lane)
if len(cur_lane.predecessor) > 0:
return lanelet_network.find_lanelet_by_id(cur_lane.predecessor[0])
else:
return None


def get_predecessor_dglane(lanelet_network: LaneletNetwork, cur_lane: Lanelet | DgLanelet) -> Optional[DgLanelet]:
pre_lanelet = get_predecessor_lane(lanelet_network, cur_lane)
if pre_lanelet is None:
return None
else:
return DgLanelet.from_commonroad_lanelet(pre_lanelet)
Loading