Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CAN bus and ODrive Motor components #45

Merged
98 changes: 49 additions & 49 deletions src/raspi/component/can/can_bus.py
Original file line number Diff line number Diff line change
@@ -1,64 +1,64 @@
import can
import cantools
import math
import time
import logging
from typing import Callable
from dataclasses import dataclass

from state_management import (
create_generic_context,
device,
device_action,
device_parser,
)

@device
@dataclass(slots = True)
Slaith12 marked this conversation as resolved.
Show resolved Hide resolved
class CanBus:
bus: can.Bus
db: object
listeners: dict
notifier : can.Notifier

def __init__(self,channel = "can0",interface = "socketcan",config_context = None, ignore_config = False, **kwargs):
self.db = cantools.database.load_file("src/raspi/odrive-cansimple.dbc")
self.bus = can.Bus(channel,interface,config_context,ignore_config,**kwargs)


# TODO: Move MotorController into it's own file.
class MotorController:
axis_id: int
can: CanBus
self.listeners = {}
self.notifier = can.Notifier(self.bus, [lambda msg: read_message(self, msg)])
Slaith12 marked this conversation as resolved.
Show resolved Hide resolved

def read_message(can_bus: CanBus, msg) -> None:
axisId = msg.arbitration_id >> 5
if axisId in can_bus.listeners:
can_bus.listeners[axisId](msg)
else:
logging.info("Unhandled message with id " + hex(msg.arbitration_id))
Slaith12 marked this conversation as resolved.
Show resolved Hide resolved

def __init__(self, can: CanBus, axis_id: int):
self.axis_id = axis_id
self.can = can
ctx = create_generic_context("can_bus", [CanBus])


def send_message(self, message_name: str, message_data: dict):
"""
Send a message to this motor controller's can bus.
"""
# create message
msg = self.can.db.get_message_by_name(message_name)
data = msg.encode(message_data)
msg = self.can.Message(arbitration_id = msg.frame_id | self.axis_id << 5, is_extended_id = False, data = data)
logging.info(self.can.db.decode_message(message_name, msg.data))
logging.info(msg)

# send message
try:
self.can.bus.send(msg)
logging.info("Message sent on {}".format(self.can.bus.channel_info))
except self.can.CanError:
logging.error("Message NOT sent! Please verify can0 is working first")

@device_parser(ctx)
def parse_can_bus(data: dict) -> CanBus:
return CanBus(**data)

def receive_single_message(self):
"""
Recieve a single message (likely not the expected one), depends on how can.recv() works
"""
msg = self.can.recv()
logging.info(msg)
return msg
@device_action(ctx)
def send_message(bus: CanBus, arbitration_id: int, data, is_extended_id: bool = False) -> bool:
"""Sends a message through the can bus. Returns whether the send was successful."""
msg = can.Message(arbitration_id=arbitration_id, is_extended_id = is_extended_id, data = data)
try:
bus.bus.send(msg)
return True
except can.CanError:
return False
Slaith12 marked this conversation as resolved.
Show resolved Hide resolved

@device_action(ctx)
def send_message_recurring(bus: CanBus, arbitration_id: int, data, period: float, is_extended_id: bool = False, duration: float = None, modifier_callback: Callable[[object], None] = None) -> object:
"""Repeatedly sends a message through the can bus. Returns a task for it."""
msg = can.Message(arbitration_id=arbitration_id, is_extended_id = is_extended_id, data = data)
return bus.send_periodic(msg, period, duration, modifier_callback=modifier_callback)

def recieve_message_by_name(self,message_name):
"""
Receive a message by name from can.recv(), depends on how can.recv() works
"""
while True:
msg = self.bus.recv()
if msg.arbitration_id == ((self.axis_id << 5) | self.can.bus.db.get_message_by_name(message_name).frame_id):
logging.info(msg)
return msg
@device_action(ctx)
def add_listener(bus: CanBus, axisID: int, callback: Callable[[object], None]) -> None:
"""Adds a listener to check for messages on a certain axis. There should be one listener per axis."""
if axisID in bus.listeners:
logging.warning("Attempting to register multiple listeners for the same CAN Axis when only one allowed. Overriding old listener.")
bus.listeners[axisID] = callback

@device_action(ctx)
def close(bus: CanBus) -> None:
bus.notifier.stop()
Slaith12 marked this conversation as resolved.
Show resolved Hide resolved
bus.bus.shutdown()
1 change: 1 addition & 0 deletions src/raspi/component/motor/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from . import raw_motor as raw_motor_action
from . import odrive_motor as odrive_motor_action
from .pin import *
107 changes: 107 additions & 0 deletions src/raspi/component/motor/odrive_enums.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
from enum import IntEnum, IntFlag

class MotorState(IntEnum):
UNDEFINED = 0
IDLE = 1
STARTUP_SEQUENCE = 2
FULL_CALIBRATION_SEQUENCE = 3
MOTOR_CALIBRATION = 4
ENCODER_INDEX_SEARCH = 6
ENCODER_OFFSET_CALIBRATION = 7
CLOSED_LOOP_CONTROL = 8
LOCKIN_SPIN = 9
ENCODER_DIR_FIND = 10
HOMING = 11
ENCODER_HALL_POLARITY_CALIBRATION = 12
ENCODER_HALL_PHASE_CALIBRATION = 13

class InputMode(IntEnum):
INACTIVE = 0
PASSTHROUGH = 1
VEL_RAMP = 2
POS_FILTER = 3
MIX_CHANNELS = 4
TRAP_TRAJ = 5
TORQUE_RAMP = 6
MIRROR = 7
TUNING = 8

class ControlMode(IntEnum):
VOLTAGE_CONTROL = 0
TORQUE_CONTROL = 1
VELOCITY_CONTROL = 2
POSITION_CONTROL = 3

class AxisError(IntFlag):
NONE = 0
INVALID_STATE = 1
MOTOR_FAILED = 64
SENSORLESS_ESTIMATOR_FAILED = 128
ENCODER_FAILED = 256
CONTROLLER_FAILED = 512
WATCHDOG_TIMER_EXPIRED = 2048
MIN_ENDSTOP_PRESSED = 4096
MAX_ENDSTOP_PRESSED = 8192
ESTOP_REQUESTED = 16384
HOMING_WITHOUT_ENDSTOP = 131072
OVER_TEMP = 262144
UNKNOWN_POSITION = 524288

class MotorError(IntFlag):
NONE = 0
PHASE_RESISTANCE_OUT_OF_RANGE = 1
PHASE_INDUCTANCE_OUT_OF_RANGE = 2
DRV_FAULT = 8
CONTROL_DEADLINE_MISSED = 16
MODULATION_MAGNITUDE = 128
CURRENT_SENSE_SATURATION = 1024
CURRENT_LIMIT_VIOLATION = 4096
MODULATION_IS_NAN = 65536
MOTOR_THERMISTOR_OVER_TEMP = 131072
FET_THERMISTOR_OVER_TEMP = 262144
TIMER_UPDATE_MISSED = 524288
CURRENT_MEASUREMENT_UNAVAILABLE = 1048576
CONTROLLER_FAILED = 2097152
I_BUS_OUT_OF_RANGE = 4194304
BRAKE_RESISTOR_DISARMED = 8388608
SYSTEM_LEVEL = 16777216
BAD_TIMING = 33554432
UNKNOWN_PHASE_ESTIMATE = 67108864
UNKNOWN_PHASE_VEL = 134217728
UNKNOWN_TORQUE = 268435456
UNKNOWN_CURRENT_COMMAND = 536870912
UNKNOWN_CURRENT_MEASUREMENT = 1073741824
UNKNOWN_VBUS_VOLTAGE = 2147483648
UNKNOWN_VOLTAGE_COMMAND = 4294967296
UNKNOWN_GAINS = 8589934592
CONTROLLER_INITIALIZING = 17179869184
UNBALANCED_PHASES = 34359738368

class EncoderError(IntFlag):
NONE = 0
UNSTABLE_GAIN = 1
CPR_POLEPAIRS_MISMATCH = 2
NO_RESPONSE = 4
UNSUPPORTED_ENCODER_MODE = 8
ILLEGAL_HALL_STATE = 16
INDEX_NOT_FOUND_YET = 32
ABS_SPI_TIMEOUT = 64
ABS_SPI_COM_FAIL = 128
ABS_SPI_NOT_READY = 256
HALL_NOT_CALIBRATED_YET = 512

class SensorlessError(IntFlag):
NONE = 0
UNSTABLE_GAIN = 1
UNKNOWN_CURRENT_MEASUREMENT = 2

class ControllerError(IntFlag):
NONE = 0
OVERSPEED = 1
INVALID_INPUT_MODE = 2
UNSTABLE_GAIN = 4
INVALID_MIRROR_AXIS = 8
INVALID_LOAD_ENCODER = 16
INVALID_ESTIMATE = 32
INVALID_CIRCULAR_RANGE = 64
SPINOUT_DETECTED = 128
136 changes: 136 additions & 0 deletions src/raspi/component/motor/odrive_motor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import cantools
import math
import time
import logging
from dataclasses import dataclass
from component.motor.odrive_enums import *

import component.can.can_bus as can_bus

from state_management import (
create_generic_context,
device,
device_action,
device_parser,
identifier
)

db = cantools.database.load_file("src/raspi/component/motor/odrive-cansimple.dbc")
Slaith12 marked this conversation as resolved.
Show resolved Hide resolved

@device
@dataclass()
Slaith12 marked this conversation as resolved.
Show resolved Hide resolved
class ODriveMotor:
axisID: int
bus: can_bus.CanBus = identifier(can_bus.ctx)
current_state : MotorState = MotorState.UNDEFINED
current_position: float = None
current_velocity: float = None
default_state : MotorState = MotorState.CLOSED_LOOP_CONTROL

def __post_init__(self):
can_bus.add_listener(self.bus, self.axisID, lambda msg: read_message(self, msg))

def read_message(motor, msg):
msg_id = msg.arbitration_id & 0x1F
msg_db = db.get_message_by_frame_id(msg_id)
match msg_db.name:
case "Heartbeat": #0x01
data = msg_db.decode(msg.data)
state = data['Axis_State'].value
motor.current_state = state
logging.info("Heartbeat w/ state: " + str(state))
axis_error = data['Axis_Error']
print
if get_error_num(axis_error) != 0:
logging.error("Axis Error w/ following data: " + axis_error)
case "Get_Motor_Error": #0x03
logging.error("Motor Error w/ following data: " + msg_db.decode(msg.data))
case "Get_Encoder_Error": #0x04
logging.error("Encoder Error w/ following data: " + msg_db.decode(msg.data))
case "Get_Sensorless_Error": #0x05
logging.error("Sensorless Error w/ following data: " + msg_db.decode(msg.data))
case "Get_Encoder_Estimates": #0x09
data = msg_db.decode(msg.data)
motor.current_position = data['Pos_Estimate']
motor.current_velocity = data['Vel_Estimate']
print("Estimates Updated")
pass
case "Get_Encoder_Count": #0x0A
pass
case "Get_IQ": #0x14
pass
case "Get_Sensorless_Estimates": #0x15
pass
case "Get_Bus_Voltage_Current": #0x17
pass
case "Get_ADC_Voltage": #0x1C
pass
case "Get_Controller_Error": #0x1D
logging.error("Controller Error w/ following data: " + msg_db.decode(msg.data))
case _:
logging.warning(f"Unrecognized message \"{msg_db.name}\" recieved (id {hex(msg_id)})")
Slaith12 marked this conversation as resolved.
Show resolved Hide resolved

def get_error_num(error):
#errors are flags, which cantools does not properly handle
#if only 1 flag is present, it returns a "NamedSignalValue"
#if multiple flags are present, it returns an int
#"NamedSignalValue" can not be cast to int, the only way to get the number is to do "error.value", which is invalid if "error" is an int
if error is int:
return error
else:
return error.value

ctx = create_generic_context("odrive_motor", [ODriveMotor])

@device_parser(ctx)
def parse_odrive(data: dict) -> ODriveMotor:
motor = ODriveMotor(**data)
return motor
Slaith12 marked this conversation as resolved.
Show resolved Hide resolved

@device_action(ctx)
def set_target_position(motor: ODriveMotor, position: float, velocity_FF: float = 0.0, torque_FF: float = 0.0) -> bool:
return send_message(motor, "Set_Input_Pos", {'Input_Pos': position, 'Vel_FF': velocity_FF, 'Torque_FF': torque_FF})

@device_action(ctx)
def set_target_velocity(motor: ODriveMotor, velocity: float, torque_FF: float = 0.0) -> bool:
return send_message(motor, "Set_Input_Vel", {'Input_Vel': velocity, 'Input_Torque_FF': torque_FF})

@device_action(ctx)
def stop(motor: ODriveMotor) -> bool:
return set_target_velocity(motor, 0)

@device_action(ctx)
def get_current_position(motor: ODriveMotor) -> float:
return motor.current_position

@device_action(ctx)
def get_current_velocity(motor: ODriveMotor) -> float:
return motor.current_velocity

@device_action(ctx)
def set_limits(motor: ODriveMotor, velocity_limit: float, current_limit: float) -> bool:
return send_message(motor, "Set_Limits", {'Velocity_Limit': velocity_limit, 'Current_Limit': current_limit})

@device_action(ctx)
def request_set_state(motor: ODriveMotor, state: MotorState) -> bool:
return send_message(motor, "Set_Axis_State", {'Axis_Requested_State': state})

@device_action(ctx)
def calibrate(motor) -> bool:
pass
Slaith12 marked this conversation as resolved.
Show resolved Hide resolved

@device_action(ctx)
def get_state(motor: ODriveMotor) -> MotorState:
"""Returns the most recent state reported by the motor."""
return motor.current_state

@device_action(ctx)
def set_controller_mode(motor: ODriveMotor, control_mode: ControlMode, input_mode: InputMode) -> bool:
return send_message(motor, "Set_Controller_Mode", {'Control_Mode': control_mode, 'Input_Mode': input_mode})

@device_action(ctx)
def send_message(motor: ODriveMotor, msg_name: str, data: dict) -> bool:
msg = db.get_message_by_name(msg_name)
msg_id = msg.frame_id | motor.axisID << 5
data = msg.encode(data)
return can_bus.send_message(motor.bus, msg_id, data)
12 changes: 12 additions & 0 deletions src/raspi/pinconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -107,5 +107,17 @@
"left_limit_switch": "io_expander_1.channel_4",
"right_limit_switch": "io_expander_1.channel_5"
}
},
"can_bus": {
"can_bus_1": {
"channel": "can0",
"interface": "socketcan"
}
},
"odrive_motor": {
"odrive_1": {
"bus": "can_bus_1",
"axisID": 0
}
}
}
Loading