diff --git a/src/raspi/component/can/can_bus.py b/src/raspi/component/can/can_bus.py index be4067d..6366224 100644 --- a/src/raspi/component/can/can_bus.py +++ b/src/raspi/component/can/can_bus.py @@ -1,64 +1,70 @@ import can -import cantools -import math -import time import logging +from typing import Callable +from dataclasses import dataclass, field -class CanBus: - bus: can.Bus - db: object +from state_management import ( + create_generic_context, + device, + device_action, + device_parser, +) - def __init__(self,channel = "can0",interface = "socketcan",config_context = None, ignore_config = False, **kwargs): - self.db = cantools.database.load_file("src/raspi/odrive-cansimple.dbc") - self.bus = can.Bus(channel,interface,config_context,ignore_config,**kwargs) - +DEFAULT_BUS_CONFIG = {"channel": "can0", "interface": "socketcan", "config_context": None, "ignore_config": False} -# TODO: Move MotorController into it's own file. -class MotorController: - axis_id: int - can: CanBus +@device +@dataclass(slots = True) +class CanBus: + busConfig : dict = field(default_factory=dict) + bus: can.Bus = None + notifier : can.Notifier = None + listeners : dict = field(default_factory=dict) + def __post_init__(self): + self.busConfig = {**DEFAULT_BUS_CONFIG, **self.busConfig} + self.bus = self.bus if self.bus else can.Bus(**self.busConfig) + self.notifier = self.notifier if self.notifier else can.Notifier(self.bus, [lambda msg: read_message(self, msg)]) - def __init__(self, can: CanBus, axis_id: int): - self.axis_id = axis_id - self.can = can +def read_message(can_bus: CanBus, msg) -> None: + axisId = msg.arbitration_id >> 5 + logging.info("Received message with id " + hex(msg.arbitration_id)) + if axisId in can_bus.listeners: + can_bus.listeners[axisId](msg) + else: + logging.warning("Unhandled message with id " + hex(msg.arbitration_id)) - - def send_message(self, message_name: str, message_data: dict): - """ - Send a message to this motor controller's can bus. - """ - # create message - msg = self.can.db.get_message_by_name(message_name) - data = msg.encode(message_data) - msg = self.can.Message(arbitration_id = msg.frame_id | self.axis_id << 5, is_extended_id = False, data = data) - logging.info(self.can.db.decode_message(message_name, msg.data)) - logging.info(msg) +ctx = create_generic_context("can_bus", [CanBus]) - # send message - try: - self.can.bus.send(msg) - logging.info("Message sent on {}".format(self.can.bus.channel_info)) - except self.can.CanError: - logging.error("Message NOT sent! Please verify can0 is working first") - +@device_parser(ctx) +def parse_can_bus(data: dict) -> CanBus: + return CanBus(**data) - def receive_single_message(self): - """ - Recieve a single message (likely not the expected one), depends on how can.recv() works - """ - msg = self.can.recv() - logging.info(msg) - return msg +@device_action(ctx) +def send_message(bus: CanBus, arbitration_id: int, data, is_extended_id: bool = False) -> bool: + """Sends a message through the can bus. Returns whether the send was successful.""" + msg = can.Message(arbitration_id=arbitration_id, is_extended_id = is_extended_id, data = data) + try: + bus.bus.send(msg) + return True + except can.CanError: + logging.error("Unable to send message on CAN bus.") + return False +@device_action(ctx) +def send_message_recurring(bus: CanBus, arbitration_id: int, data, period: float, is_extended_id: bool = False, duration: float = None, modifier_callback: Callable[[object], None] = None) -> object: + """Repeatedly sends a message through the can bus. Returns a task for it.""" + msg = can.Message(arbitration_id=arbitration_id, is_extended_id = is_extended_id, data = data) + return bus.send_periodic(msg, period, duration, modifier_callback=modifier_callback) - def recieve_message_by_name(self,message_name): - """ - Receive a message by name from can.recv(), depends on how can.recv() works - """ - while True: - msg = self.bus.recv() - if msg.arbitration_id == ((self.axis_id << 5) | self.can.bus.db.get_message_by_name(message_name).frame_id): - logging.info(msg) - return msg +@device_action(ctx) +def add_listener(bus: CanBus, axisID: int, callback: Callable[[object], None]) -> None: + """Adds a listener to check for messages on a certain axis. There should be one listener per axis.""" + if axisID in bus.listeners: + logging.warning("Attempting to register multiple listeners for the same CAN Axis when only one allowed. Overriding old listener.") + bus.listeners[axisID] = callback +@device_action(ctx) +def close(bus: CanBus) -> None: + logging.info("Closing CAN Bus") + bus.notifier.stop() + bus.bus.shutdown() diff --git a/src/raspi/component/motor/__init__.py b/src/raspi/component/motor/__init__.py index 3189a32..0e015c2 100644 --- a/src/raspi/component/motor/__init__.py +++ b/src/raspi/component/motor/__init__.py @@ -1,2 +1,3 @@ from . import raw_motor as raw_motor_action +from . import odrive_motor as odrive_motor_action from .pin import * diff --git a/src/raspi/odrive-cansimple.dbc b/src/raspi/component/motor/odrive-cansimple.dbc similarity index 100% rename from src/raspi/odrive-cansimple.dbc rename to src/raspi/component/motor/odrive-cansimple.dbc diff --git a/src/raspi/component/motor/odrive_enums.py b/src/raspi/component/motor/odrive_enums.py new file mode 100644 index 0000000..da83c4a --- /dev/null +++ b/src/raspi/component/motor/odrive_enums.py @@ -0,0 +1,107 @@ +from enum import IntEnum, IntFlag + +class MotorState(IntEnum): + UNDEFINED = 0 + IDLE = 1 + STARTUP_SEQUENCE = 2 + FULL_CALIBRATION_SEQUENCE = 3 + MOTOR_CALIBRATION = 4 + ENCODER_INDEX_SEARCH = 6 + ENCODER_OFFSET_CALIBRATION = 7 + CLOSED_LOOP_CONTROL = 8 + LOCKIN_SPIN = 9 + ENCODER_DIR_FIND = 10 + HOMING = 11 + ENCODER_HALL_POLARITY_CALIBRATION = 12 + ENCODER_HALL_PHASE_CALIBRATION = 13 + +class InputMode(IntEnum): + INACTIVE = 0 + PASSTHROUGH = 1 + VEL_RAMP = 2 + POS_FILTER = 3 + MIX_CHANNELS = 4 + TRAP_TRAJ = 5 + TORQUE_RAMP = 6 + MIRROR = 7 + TUNING = 8 + +class ControlMode(IntEnum): + VOLTAGE_CONTROL = 0 + TORQUE_CONTROL = 1 + VELOCITY_CONTROL = 2 + POSITION_CONTROL = 3 + +class AxisError(IntFlag): + NONE = 0 + INVALID_STATE = 1 + MOTOR_FAILED = 64 + SENSORLESS_ESTIMATOR_FAILED = 128 + ENCODER_FAILED = 256 + CONTROLLER_FAILED = 512 + WATCHDOG_TIMER_EXPIRED = 2048 + MIN_ENDSTOP_PRESSED = 4096 + MAX_ENDSTOP_PRESSED = 8192 + ESTOP_REQUESTED = 16384 + HOMING_WITHOUT_ENDSTOP = 131072 + OVER_TEMP = 262144 + UNKNOWN_POSITION = 524288 + +class MotorError(IntFlag): + NONE = 0 + PHASE_RESISTANCE_OUT_OF_RANGE = 1 + PHASE_INDUCTANCE_OUT_OF_RANGE = 2 + DRV_FAULT = 8 + CONTROL_DEADLINE_MISSED = 16 + MODULATION_MAGNITUDE = 128 + CURRENT_SENSE_SATURATION = 1024 + CURRENT_LIMIT_VIOLATION = 4096 + MODULATION_IS_NAN = 65536 + MOTOR_THERMISTOR_OVER_TEMP = 131072 + FET_THERMISTOR_OVER_TEMP = 262144 + TIMER_UPDATE_MISSED = 524288 + CURRENT_MEASUREMENT_UNAVAILABLE = 1048576 + CONTROLLER_FAILED = 2097152 + I_BUS_OUT_OF_RANGE = 4194304 + BRAKE_RESISTOR_DISARMED = 8388608 + SYSTEM_LEVEL = 16777216 + BAD_TIMING = 33554432 + UNKNOWN_PHASE_ESTIMATE = 67108864 + UNKNOWN_PHASE_VEL = 134217728 + UNKNOWN_TORQUE = 268435456 + UNKNOWN_CURRENT_COMMAND = 536870912 + UNKNOWN_CURRENT_MEASUREMENT = 1073741824 + UNKNOWN_VBUS_VOLTAGE = 2147483648 + UNKNOWN_VOLTAGE_COMMAND = 4294967296 + UNKNOWN_GAINS = 8589934592 + CONTROLLER_INITIALIZING = 17179869184 + UNBALANCED_PHASES = 34359738368 + +class EncoderError(IntFlag): + NONE = 0 + UNSTABLE_GAIN = 1 + CPR_POLEPAIRS_MISMATCH = 2 + NO_RESPONSE = 4 + UNSUPPORTED_ENCODER_MODE = 8 + ILLEGAL_HALL_STATE = 16 + INDEX_NOT_FOUND_YET = 32 + ABS_SPI_TIMEOUT = 64 + ABS_SPI_COM_FAIL = 128 + ABS_SPI_NOT_READY = 256 + HALL_NOT_CALIBRATED_YET = 512 + +class SensorlessError(IntFlag): + NONE = 0 + UNSTABLE_GAIN = 1 + UNKNOWN_CURRENT_MEASUREMENT = 2 + +class ControllerError(IntFlag): + NONE = 0 + OVERSPEED = 1 + INVALID_INPUT_MODE = 2 + UNSTABLE_GAIN = 4 + INVALID_MIRROR_AXIS = 8 + INVALID_LOAD_ENCODER = 16 + INVALID_ESTIMATE = 32 + INVALID_CIRCULAR_RANGE = 64 + SPINOUT_DETECTED = 128 diff --git a/src/raspi/component/motor/odrive_motor.py b/src/raspi/component/motor/odrive_motor.py new file mode 100644 index 0000000..3e418d5 --- /dev/null +++ b/src/raspi/component/motor/odrive_motor.py @@ -0,0 +1,204 @@ +import cantools +import math +import time +import logging +from dataclasses import dataclass +from component.motor.odrive_enums import * + +import component.can.can_bus as can_bus + +from state_management import ( + create_generic_context, + device, + device_action, + device_parser, + identifier +) + +ODRIVE_CAN_DB = cantools.database.load_file("src/raspi/component/motor/odrive-cansimple.dbc") + +@device +@dataclass +class ODriveMotor: + axisID: int + control_mode: ControlMode + input_mode: InputMode + #reminder: fields with default values must come after fields without defaults + bus: can_bus.CanBus = identifier(can_bus.ctx) + position_min: float = None + position_max: float = None + + current_state : MotorState = MotorState.UNDEFINED + current_position: float = None + current_velocity: float = None + + def __post_init__(self): + can_bus.add_listener(self.bus, self.axisID, lambda msg: read_message(self, msg)) + +#region Message Handling + +EVENT_HANDLE = dict() +EVENT_IGNORE = ["Get_Encoder_Count", "Get_IQ", "Get_Sensorless_Estimates", "Get_Bus_Voltage_Current", "Get_ADC_Voltage"] + +def read_message(motor, msg): + msg_id = msg.arbitration_id & 0x1F + msg_db = ODRIVE_CAN_DB.get_message_by_frame_id(msg_id) + if msg_db.name in EVENT_HANDLE: + EVENT_HANDLE[msg_db.name](motor, msg_db.decode(msg.data)) + elif msg_db.name not in EVENT_IGNORE: + logging.warning( + f'Unrecognized message "{msg_db.name}" recieved (id {hex(msg_id)})' + ) + +def event_decorator(name): + def wrapper(func): + EVENT_HANDLE[name] = func + return func + return wrapper + +@event_decorator("Heartbeat") +def read_heartbeat(motor, data): + state = data['Axis_State'].value + motor.current_state = state + logging.info(f"Axis {motor.axisID} Heartbeat w/ state: " + str(state)) + axis_error = data['Axis_Error'] + if get_error_num(axis_error) != 0: + logging.error(f"Axis {motor.axisID} Error w/ following data: " + str(axis_error)) + +@event_decorator("Get_Encoder_Estimates") +def update_estimates(motor, data): + motor.current_position = data['Pos_Estimate'] + motor.current_velocity = data['Vel_Estimate'] + +@event_decorator("Get_Motor_Error") +def report_motor_error(motor, data): + logging.error(f"Motor Error on axis {motor.axisID} w/ following data: " + str(data)) + +@event_decorator("Get_Encoder_Error") +def report_encoder_error(motor, data): + logging.error(f"Encoder Error on axis {motor.axisID} w/ following data: " + str(data)) + +@event_decorator("Get_Sensorless_Error") +def report_sensorless_error(motor, data): + logging.error(f"Sensorless Error on axis {motor.axisID} w/ following data: " + str(data)) + +@event_decorator("Get_Controller_Error") +def report_controller_error(motor, data): + logging.error(f"Controller Error on axis {motor.axisID} w/ following data: " + str(data)) + +def get_error_num(error): + #errors are flags, which cantools does not properly handle + #if only 1 flag is present, it returns a "NamedSignalValue" + #if multiple flags are present, it returns an int + #"NamedSignalValue" can not be cast to int, the only way to get the number is to do "error.value", which is invalid if "error" is an int + if error is int: + return error + else: + return error.value + +#endregion + +#region Device Actions + +ctx = create_generic_context("odrive_motor", [ODriveMotor]) + +@device_parser(ctx) +def parse_odrive(data: dict) -> ODriveMotor: + return ODriveMotor(**data) + +@device_action(ctx) +def set_target_position(motor: ODriveMotor, position: float, velocity_FF: float = 0.0, torque_FF: float = 0.0) -> bool: + """Set the target position for this motor + Only use this function if Control Mode is set to POSITION_CONTROL""" + if motor.control_mode != ControlMode.POSITION_CONTROL: + logging.error(f"Can not set position on motor {motor.axisID} when not set to POSITION_CONTROL.") + return False + if motor.position_min != None and position < motor.position_min: + logging.warning(f"Attempting to set position on motor {motor.axisID} past lower bounds.") + position = motor.position_min + elif motor.position_max != None and position > motor.position_max: + logging.warning(f"Attempting to set position on motor {motor.axisID} past upper bounds.") + position = motor.position_max + return send_message(motor, "Set_Input_Pos", {'Input_Pos': position, 'Vel_FF': velocity_FF, 'Torque_FF': torque_FF}) + +@device_action(ctx) +def set_trajectory_velocity(motor: ODriveMotor, velocity: float) -> bool: + return send_message(motor, "Set_Traj_Vel_Limit", {'Traj_Vel_Limit': velocity}) + +@device_action(ctx) +def set_trajectory_accel(motor: ODriveMotor, accel: float, decel: float = None) -> bool: + return send_message(motor, "Set_Traj_Accel_Limits", {'Traj_Accel_Limit': accel, 'Traj_Decel_Limit': decel if decel else accel}) + +@device_action(ctx) +def set_target_velocity(motor: ODriveMotor, velocity: float, torque_FF: float = 0.0) -> bool: + """Set the target velocity for this motor""" + if motor.control_mode == ControlMode.POSITION_CONTROL and motor.input_mode == InputMode.TRAP_TRAJ: + return set_trajectory_velocity(motor, velocity) + elif motor.control_mode != ControlMode.VELOCITY_CONTROL: + logging.error(f"Can not set input velocity on motor {motor.axisID} when not set to VELOCITY_CONTROL.") + return False + return send_message(motor, "Set_Input_Vel", {'Input_Vel': velocity, 'Input_Torque_FF': torque_FF}) + +@device_action(ctx) +def stop(motor: ODriveMotor): + match motor.control_mode: + case ControlMode.POSITION_CONTROL: + if motor.current_position is not None: + set_target_position(motor, motor.current_position) + case ControlMode.VELOCITY_CONTROL: + set_target_velocity(motor, 0) + +@device_action(ctx) +def get_current_position(motor: ODriveMotor) -> float: + return motor.current_position + +@device_action(ctx) +def get_position_limits(motor: ODriveMotor) -> list[float]: + return [motor.position_min, motor.position_max] + +@device_action(ctx) +def get_current_velocity(motor: ODriveMotor) -> float: + return motor.current_velocity + +## These are safety limits set by the hardware team. DO NOT CHANGE THESE LIMITS. +# @device_action(ctx) +# def set_limits(motor: ODriveMotor, velocity_limit: float, current_limit: float) -> bool: +# """Set the velocity and current limits for this motor +# Consider changing the motor's config instead of using this function""" +# return send_message(motor, "Set_Limits", {'Velocity_Limit': velocity_limit, 'Current_Limit': current_limit}) + +@device_action(ctx) +def request_set_state(motor: ODriveMotor, state: MotorState) -> bool: + """Set the state of the motor + Motor will not recieve position/velocity input if not set to CLOSED_LOOP_CONTROL""" + return send_message(motor, "Set_Axis_State", {'Axis_Requested_State': state}) + +@device_action(ctx) +def get_state(motor: ODriveMotor) -> MotorState: + """Returns the most recent state reported by the motor.""" + return motor.current_state + +@device_action(ctx) +def set_controller_mode(motor: ODriveMotor, new_control_mode: ControlMode = None, new_input_mode: InputMode = None) -> bool: + if new_control_mode != ControlMode.POSITION_CONTROL and (motor.position_min is not None or motor.position_max is not None): + logging.warning(f"Motor {motor.axisID} has position limits, but is being set to non-position control. Position limits can not be enforced under non-position control.") + return send_message(motor, "Set_Controller_Mode", + {'Control_Mode': new_control_mode if new_control_mode else motor.control_mode, + 'Input_Mode': new_input_mode if new_input_mode else motor.input_mode}) + +@device_action(ctx) +def get_control_mode(motor: ODriveMotor) -> ControlMode: + return motor.control_mode + +@device_action(ctx) +def get_input_mode(motor: ODriveMotor) -> InputMode: + return motor.input_mode + +@device_action(ctx) +def send_message(motor: ODriveMotor, msg_name: str, data: dict) -> bool: + msg = ODRIVE_CAN_DB.get_message_by_name(msg_name) + msg_id = msg.frame_id | motor.axisID << 5 + data = msg.encode(data) + return can_bus.send_message(motor.bus, msg_id, data) + +#endregion diff --git a/src/raspi/pinconfig.json b/src/raspi/pinconfig.json index 11f3cc3..e8ae8b1 100644 --- a/src/raspi/pinconfig.json +++ b/src/raspi/pinconfig.json @@ -107,5 +107,20 @@ "left_limit_switch": "io_expander_1.channel_4", "right_limit_switch": "io_expander_1.channel_5" } + }, + "can_bus": { + "can_bus_1": { + } + }, + "odrive_motor": { + "odrive_1": { + "bus": "can_bus_1", + "axisID": 0, + + "control_mode": 3, + "input_mode": 5, + "position_min": -35, + "position_max": 35 + } } } \ No newline at end of file diff --git a/src/raspi/samples/can/cantools_testing.py b/src/raspi/samples/can/cantools_testing.py new file mode 100644 index 0000000..ade1778 --- /dev/null +++ b/src/raspi/samples/can/cantools_testing.py @@ -0,0 +1,24 @@ +import cantools +from enum import IntEnum + +db = cantools.database.load_file("src/raspi/odrive-cansimple.dbc") + +heartbeat_msg = db.get_message_by_name("Heartbeat") +heartbeat_data = {'Trajectory_Done_Flag': 0, + 'Controller_Error_Flag': 0, + 'Encoder_Error_Flag': 0, + 'Motor_Error_Flag': 0, + 'Axis_State': 3, + 'Axis_Error': 128} +encoded_data = heartbeat_msg.encode(heartbeat_data) +print(encoded_data) +print(heartbeat_msg.decode(encoded_data)['Axis_Error'] is not int) + +class Test(IntEnum): + woah = 2 + +def Hello(a: Test): + print(a) + +Hello(2) +Hello(Test.woah) \ No newline at end of file diff --git a/src/raspi/samples/can/motor_controller_can.py b/src/raspi/samples/can/motor_controller_can.py index 3bad9fa..5025231 100644 --- a/src/raspi/samples/can/motor_controller_can.py +++ b/src/raspi/samples/can/motor_controller_can.py @@ -4,7 +4,7 @@ import time # initialize can messages and bus -db = cantools.database.load_file("src/raspi/odrive-cansimple.dbc") +db = cantools.database.load_file("src/raspi/component/motor/odrive-cansimple.dbc") print(db.messages) bus = can.Bus("can0", bustype = "socketcan") diff --git a/src/raspi/samples/can/odrive_motor_test.py b/src/raspi/samples/can/odrive_motor_test.py new file mode 100644 index 0000000..7bc50cf --- /dev/null +++ b/src/raspi/samples/can/odrive_motor_test.py @@ -0,0 +1,104 @@ +import logging + +from component.can import can_bus as canbus_action +from component.motor import odrive_motor_action as odrive_action +from component.motor.odrive_enums import * +from state_management import configure_device +from view.pygame import * + +configure_device("src/raspi/pinconfig.json") + +can_bus = "can_bus_1" +motor = "odrive_1" + + +def hydrate_screen(): + """Post setup for the screen (after pygame.init() and global variable are set)""" + logging.info("Hydrating Screen with initial values") + logging.info("Hydrating Screen with initial values") + render_row(0, f"Position: {odrive_action.get_current_position(motor)}") + render_row(1, f"Velocity: {odrive_action.get_current_velocity(motor)}") + render_left_status(False) + render_right_status(False) + + update_screen() + logging.info("Screen Hydrated") + logging.info("Completed Screen Update Events") + + +def main(): + """Main program loop""" + exit = False + left_input = False + right_input = False + speed = 15 + use_speed_limit = True + resting_pos = 0 + while not exit: + change = False + for event in get_keys(): + if is_event_type(event, "down"): + if event.key in [pygame.K_a, pygame.K_LEFT]: + left_input = True + change = True + elif event.key in [pygame.K_d, pygame.K_RIGHT]: + right_input = True + change = True + elif event.key in [pygame.K_w, pygame.K_UP]: + speed += 1 + change = True + elif event.key in [pygame.K_s, pygame.K_DOWN]: + speed -= 1 + change = True + elif event.key in [pygame.K_SPACE]: + use_speed_limit = not use_speed_limit + change = True + elif event.key in [pygame.K_q]: + exit = True + elif is_event_type(event, "up"): + if event.key in [pygame.K_a, pygame.K_LEFT]: + left_input = False + change = True + elif event.key in [pygame.K_d, pygame.K_RIGHT]: + right_input = False + change = True + + position = odrive_action.get_current_position(motor) + velocity = odrive_action.get_current_velocity(motor) + + render_row(0, f"Position: {position}") + render_row(1, f"Velocity: {velocity}") + render_left_status(left_input) + render_right_status(right_input) + render_row(4, f"Speed Limit: {speed if use_speed_limit else 'Uncapped'}") + + if change or (position is not None and abs(position - resting_pos) > 0.1): + pos_limits = odrive_action.get_position_limits(motor) + + if use_speed_limit: + if odrive_action.get_input_mode(motor) == InputMode.PASSTHROUGH: + odrive_action.set_controller_mode(motor, ControlMode.POSITION_CONTROL, InputMode.TRAP_TRAJ) + odrive_action.set_trajectory_velocity(velocity) + else: + if odrive_action.get_input_mode(motor) == InputMode.TRAP_TRAJ: + odrive_action.set_controller_mode(motor, ControlMode.POSITION_CONTROL, InputMode.PASSTHROUGH) + + if left_input: + odrive_action.set_target_position(motor, pos_limits[0]) + elif right_input: + odrive_action.set_target_position(motor, pos_limits[1]) + else: + odrive_action.stop(motor) + + update_screen() + clock_tick(60) + print("Exiting...") + logging.info("Exiting...") + quit_pygame() + canbus_action.close(can_bus) + +print("Initializing...") +setup_pygame() # global variables +hydrate_screen() # hydrate the screen +print("Initialization complete!") +main() diff --git a/src/raspi/view/pygame.py b/src/raspi/view/pygame.py index 2c10e18..b0eda1f 100644 --- a/src/raspi/view/pygame.py +++ b/src/raspi/view/pygame.py @@ -110,6 +110,10 @@ def render_temperature_status(temperature: float): render_text((0, 144, 640, 36), f"Temperature: {temperature}") +def render_row(row: int, text): + render_text((0, 36*row, 640, 36), text) + + def get_keys(): return pygame.event.get()