diff --git a/.gitignore b/.gitignore index 9ac7edd..bdbd200 100644 --- a/.gitignore +++ b/.gitignore @@ -99,7 +99,4 @@ ENV/ # mypy .mypy_cache/ - -# MacOS -.DS_Store -.DS_Store? +.vscode/settings.json diff --git a/examples/rtu_client.py b/examples/rtu_client.py new file mode 100644 index 0000000..e2bb5cd --- /dev/null +++ b/examples/rtu_client.py @@ -0,0 +1,53 @@ +""" + Example Modbus RTU Client + + This example shows how to communicate with a modbus server over RTU. + + Written by FACTS Engineering + Copyright (c) 2023 FACTS Engineering, LLC + Licensed under the MIT license. + +""" + +import time +import board +import busio +from uModBus.serial import RTUClient +import p1am_200_helpers as helpers # For P1AM-SERIAL +from rs485_wrapper import RS485 # If using an RS485 transceiver + +def clear_terminal(): + print(chr(27) + "[2J") + + +# For P1AM-SERIAL using RS232 +comm = helpers.get_serial(1, mode=232, baudrate=115200) + +# For P1AM-SERIAL using RS485 +# uart, de = helpers.get_serial(1, mode=485, baudrate=115200) # For P1AM-SERIAL +# comm = RS485(uart, de, auto_idle_time=.05) # If using an RS485 transceiver + +# For generic RS232 +# comm = busio.UART(board.TX1, board.RX1, baudrate=115200) + +unit_id = 1 # ID of modbus unit +mb_client = RTUClient(comm, default_unit_id=unit_id) # Optionally specify a unit ID + +counter = 0 +while True: + + counter += 1 # increment counter for register 4 + if counter > 32767: + counter = 0 # reset counter + + mb_client.write_single_register(4, counter, unit=unit_id) + current_states = mb_client.read_coils(0, 16, unit=unit_id) + holding_regs = mb_client.read_holding_registers(0, 3) # when unit is not specified, the default_unit_id is used + + clear_terminal() + for i in range(len(current_states)): + print(f"Coil #{i} is {current_states[i]}") + for i in range(len(holding_regs)): + print(f"Register #{i} is {holding_regs[i]}") + + time.sleep(1) diff --git a/examples/rtu_server.py b/examples/rtu_server.py new file mode 100644 index 0000000..206907c --- /dev/null +++ b/examples/rtu_server.py @@ -0,0 +1,60 @@ +""" + Example Modbus RTU Server + + This example shows how to configure a Modbus RTU server. + + Written by FACTS Engineering + Copyright (c) 2023 FACTS Engineering, LLC + Licensed under the MIT license. + +""" + +import board +import digitalio +from uModBus.serial import RTUServer +import p1am_200_helpers as helpers # For P1AM-SERIAL +from rs485_wrapper import RS485 # If using an RS485 transceiver + + +led = digitalio.DigitalInOut(board.LED) +led.switch_to_output() +switch = digitalio.DigitalInOut(board.SWITCH) + +# For P1AM-SERIAL using RS232 +comm = helpers.get_serial(1, mode=232, baudrate=115200) + +# For P1AM-SERIAL using RS485 +# uart, de = helpers.get_serial(1, mode=485, baudrate=115200) # For P1AM-SERIAL +# comm = RS485(uart, de, auto_idle_time=.05) # If using an RS485 transceiver + +# For generic RS232 +# comm = busio.UART(board.TX1, board.RX1, baudrate=115200) + +mb_server = RTUServer( + comm, + unit_addr=1, + number_coils=20, + number_input_registers=0xFF, + number_discrete_inputs=0x10, + number_holding_registers=10, +) + +mb_server.input_registers[0:] = list(range(0xFF)) # set input registers 0-255 to 0-255 + +mb_server.discrete_inputs[5] = True # set input register 5 to True + +mb_server.holding_registers.signed[1] = True # set holding register 1 to use 16-bit signed values + +count = 0 + +while True: + + mb_server.poll(timeout=.5) # Regularly poll the modbus server to handle incoming requests + mb_server.discrete_inputs[0] = switch.value # set discrete input 0 to switch value + mb_server.holding_registers[0] = count # set holding register 0 to count value + mb_server.holding_registers[1] = -count # set holding register 1 to negative count value + led.value = mb_server.coils[0] # set led to output value + + count += 1 + if count > 32767: + count = 0 # reset count \ No newline at end of file diff --git a/examples/tcp_client.py b/examples/tcp_client.py new file mode 100644 index 0000000..a35fdec --- /dev/null +++ b/examples/tcp_client.py @@ -0,0 +1,52 @@ +""" + Example Modbus TCP Client + + This example shows how to communicate with a modbus server over TCP. + + Written by FACTS Engineering + Copyright (c) 2023 FACTS Engineering, LLC + Licensed under the MIT license. + +""" + +import time +import board +import busio +import digitalio +import adafruit_connection_manager +from adafruit_wiznet5k.adafruit_wiznet5k import WIZNET5K +import adafruit_wiznet5k.adafruit_wiznet5k_socketpool as socket +from uModBus.tcp import TCPClient + +def clear_terminal(): + print(chr(27) + "[2J") + + +cs = digitalio.DigitalInOut(board.D5) +spi_bus = busio.SPI(board.SCK, MOSI=board.MOSI, MISO=board.MISO) +eth = WIZNET5K(spi_bus, cs, is_dhcp=True) +pool = adafruit_connection_manager.get_radio_socketpool(eth) +print(eth.pretty_ip(eth.ip_address)) + +client_ip = '192.168.1.101' +unit_id = 255 +mb_client = TCPClient(pool, client_ip, default_unit_id=unit_id) + +counter = 0 +while True: + + counter += 1 # increment counter for register 4 + if counter > 32767: + counter = 0 # reset counter + + mb_client.write_single_register(4, counter, unit=unit_id) + current_states = mb_client.read_coils(0, 16, unit=unit_id) + holding_regs = mb_client.read_holding_registers(0, 3) # when unit is not specified, the default_unit_id is used + + clear_terminal() + for i in range(len(current_states)): + print(f"Coil #{i} is {current_states[i]}") + for i in range(len(holding_regs)): + print(f"Register #{i} is {holding_regs[i]}") + + time.sleep(1) diff --git a/examples/tcp_server.py b/examples/tcp_server.py new file mode 100644 index 0000000..7d9f6eb --- /dev/null +++ b/examples/tcp_server.py @@ -0,0 +1,71 @@ +""" + Example Modbus TCP Server + + This example shows how to configure a Modbus TCP server. + + Written by FACTS Engineering + Copyright (c) 2023 FACTS Engineering, LLC + Licensed under the MIT license. + +""" + +import board +import busio +import digitalio +import p1am_200_helpers as helpers # For P1AM-ETH +from adafruit_wiznet5k.adafruit_wiznet5k import WIZNET5K +import adafruit_wiznet5k.adafruit_wiznet5k_socket as socket +from uModBus.tcp import TCPServer + + +led = digitalio.DigitalInOut(board.LED) +led.switch_to_output() +switch = digitalio.DigitalInOut(board.SWITCH) + +# For P1AM-ETH +eth = helpers.get_ethernet(False) # DHCP False + +# For generic ethernet +# cs = digitalio.DigitalInOut(board.D5) +# spi_bus = busio.SPI(board.SCK, MOSI=board.MOSI, MISO=board.MISO) +# eth = WIZNET5K(spi_bus, cs, is_dhcp=False) + +IP_ADDRESS = (192, 168, 1, 177) +SUBNET_MASK = (255, 255, 248, 0) +GATEWAY_ADDRESS = (192, 168, 0, 1) +DNS_SERVER = (8, 8, 8, 8) +eth.ifconfig = (IP_ADDRESS, SUBNET_MASK, GATEWAY_ADDRESS, DNS_SERVER) + +socket.set_interface(eth) +server_ip = eth.pretty_ip(eth.ip_address) +mb_server = TCPServer( + socket, + server_ip, + number_coils=0x20, + number_input_registers=0xFF, + number_discrete_inputs=0x10, + number_holding_registers=10, +) + +mb_server.input_registers[0:] = list(range(0xFF)) # set input registers 0-255 to 0-255 + +mb_server.discrete_inputs[5] = True + +mb_server.holding_registers.signed[1] = True # set holding register 1 to use 16-bit signed values + +count = 0 + +while True: + + try: + mb_server.poll(timeout=.1) # Regularly poll the modbus server to handle incoming requests + except RuntimeError as e: + pass # Ignore errors in case the client disconnects mid-poll + mb_server.discrete_inputs[0] = switch.value # set discrete input 0 to switch value + mb_server.holding_registers[0] = count # set holding register 0 to count value + mb_server.holding_registers[1] = -count # set holding register 1 to count value + led.value = mb_server.coils[0] # set led to output value + + count += 1 + if count > 32767: + count = 0 # reset count diff --git a/main.py b/main.py deleted file mode 100644 index f38b0f7..0000000 --- a/main.py +++ /dev/null @@ -1,105 +0,0 @@ -#!/usr/bin/env python -# -# Copyright (c) 2019, Pycom Limited. -# -# This software is licensed under the GNU GPL version 3 or any -# later version, with permitted additional terms. For more information -# see the Pycom Licence v1.0 document supplied with this file, or -# available at https://www.pycom.io/opensource/licensing -# -from uModBus.serial import Serial -from uModBus.tcp import TCP -from network import WLAN -import machine - -####################### TCP MODBUS ######################### -WIFI_SSID = 'your ssid' -WIFI_PASS = 'your password' - -wlan = WLAN(mode=WLAN.STA) -wlan.connect(WIFI_SSID, auth=(None, WIFI_PASS), timeout=5000) -while not wlan.isconnected(): - machine.idle() # save power while waiting - -print('WLAN connection succeeded!') - -slave_ip = 'slave ip' -modbus_obj = TCP(slave_ip) - -######################### RTU SERIAL MODBUS ######################### -#uart_id = 0x01 -#modbus_obj = Serial(uart_id, pins=('P9', 'P10')) -# if the serial modbus requires a ctrl pin, you can set it like this: -#modbus_obj = Serial(uart_id, pins=('P9','P10'), ctrl_pin='P8') - -######################### READ COILS ######################### -#slave_addr=0x0A -#starting_address=0x00 -#coil_quantity=100 - -#coil_status = modbus_obj.read_coils(slave_addr, starting_address, coil_quantity) -#print('Coil status: ' + ' '.join('{:d}'.format(x) for x in coil_status)) - -###################### READ DISCRETE INPUTS ################## -#slave_addr=0x0A -#starting_address=0x0 -#input_quantity=100 - -#input_status = modbus_obj.read_discrete_inputs(slave_addr, starting_address, input_quantity) -#print('Input status: ' + ' '.join('{:d}'.format(x) for x in input_status)) - -###################### READ HOLDING REGISTERS ################## -#slave_addr=0x0A -#starting_address=0x00 -#register_quantity=100 -#signed=True - -#register_value = modbus_obj.read_holding_registers(slave_addr, starting_address, register_quantity, signed) -#print('Holding register value: ' + ' '.join('{:d}'.format(x) for x in register_value)) - -###################### READ INPUT REGISTERS ################## -#slave_addr=0x0A -#starting_address=0x00 -#register_quantity=100 -#signed=True - -#register_value = modbus_obj.read_input_registers(slave_addr, starting_address, register_quantity, signed) -#print('Input register value: ' + ' '.join('{:d}'.format(x) for x in register_value)) - -###################### WRITE SINGLE COIL ################## -#slave_addr=0x0A -#output_address=0x00 -#output_value=0xFF00 - -#return_flag = modbus_obj.write_single_coil(slave_addr, output_address, output_value) -#output_flag = 'Success' if return_flag else 'Failure' -#print('Writing single coil status: ' + output_flag) - -###################### WRITE SINGLE REGISTER ################## -#slave_addr=0x0A -#register_address=0x01 -#register_value=-32768 -#signed=True - -#return_flag = modbus_obj.write_single_register(slave_addr, register_address, register_value, signed) -#output_flag = 'Success' if return_flag else 'Failure' -#print('Writing single coil status: ' + output_flag) - -###################### WRITE MULIPLE COILS ################## -#slave_addr=0x0A -#starting_address=0x00 -#output_values=[1,1,1,0,0,1,1,1,0,0,1,1,1] - -#return_flag = modbus_obj.write_multiple_coils(slave_addr, starting_address, output_values) -#output_flag = 'Success' if return_flag else 'Failure' -#print('Writing multiple coil status: ' + output_flag) - -###################### WRITE MULIPLE REGISTERS ################## -slave_addr=0x0A -register_address=0x01 -register_values=[2, -4, 6, -256, 1024] -signed=True - -return_flag = modbus_obj.write_multiple_registers(slave_addr, register_address, register_values, signed) -output_flag = 'Success' if return_flag else 'Failure' -print('Writing multiple register status: ' + output_flag) diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..1dbdf21 --- /dev/null +++ b/readme.md @@ -0,0 +1,117 @@ +# circuitpython-modbus + +CircuitPython Modbus library supporting TCP and RTU protocols as both a client and server. + +## Usage + +Modbus TCP server using the Wiznet5k. + +```python + +#Modbus TCP Server + +import board +import busio +import digitalio +from adafruit_wiznet5k.adafruit_wiznet5k import WIZNET5K +import adafruit_wiznet5k.adafruit_wiznet5k_socket as socket +from uModBus.tcp import TCPServer + + +led = digitalio.DigitalInOut(board.LED) +led.switch_to_output() +switch = digitalio.DigitalInOut(board.SWITCH) + +cs = digitalio.DigitalInOut(board.D5) +spi_bus = busio.SPI(board.SCK, MOSI=board.MOSI, MISO=board.MISO) +eth = WIZNET5K(spi_bus, cs, is_dhcp=False) + +IP_ADDRESS = (192, 168, 1, 177) +SUBNET_MASK = (255, 255, 248, 0) +GATEWAY_ADDRESS = (192, 168, 0, 1) +DNS_SERVER = (8, 8, 8, 8) +eth.ifconfig = (IP_ADDRESS, SUBNET_MASK, GATEWAY_ADDRESS, DNS_SERVER) + +socket.set_interface(eth) +server_ip = eth.pretty_ip(eth.ip_address) +mb_server = TCPServer( + socket, + server_ip, + number_coils=0x20, + number_input_registers=0xFF, + number_discrete_inputs=0x10, + number_holding_registers=10, +) + +mb_server.input_registers = list(range(0xFF)) +mb_server.discrete_inputs[5] = True +count = 0 + +while True: + + try: + mb_server.poll(timeout=.1) # Regularly poll the modbus server to handle incoming requests + except RuntimeError as e: + pass # Ignore errors in case the client disconnects mid-poll + mb_server.discrete_inputs[0] = switch.value # set discrete input 0 to switch value + mb_server.holding_registers[0] = count # set holding register 0 to count value + led.value = mb_server.coils[0] # set led to output value + + count += 1 + if count > 32767: + count = 0 # reset count + +``` + + +Modbus RTU client using a RS232 or RS485 interface. + +```python +import time +import board +import busio +from uModBus.serial import RTUClient +import p1am_200_helpers as helpers # For P1AM-SERIAL +from rs485_wrapper import RS485 # If using an RS485 transceiver + +def clear_terminal(): + print(chr(27) + "[2J") + + +# For P1AM-SERIAL using RS232 +comm = helpers.get_serial(1, mode=232, baudrate=115200) + +# For P1AM-SERIAL using RS485 +# uart, de = helpers.get_serial(1, mode=485, baudrate=115200) # For P1AM-SERIAL +# comm = RS485(uart, de, auto_idle_time=.05) # If using an RS485 transceiver + +# For generic RS232 +# comm = busio.UART(board.TX1, board.RX1, baudrate=115200) + +unit_id = 1 # ID of modbus unit +mb_client = RTUClient(comm, default_unit_id=unit_id) # Optionally specify a unit ID + +counter = 0 +while True: + + counter += 1 # increment counter for register 4 + if counter > 32767: + counter = 0 # reset counter + + mb_client.write_single_register(4, counter, unit=unit_id) + current_states = mb_client.read_coils(0, 16, unit=unit_id) + holding_regs = mb_client.read_holding_registers(0, 3) # when unit is not specified, the default_unit_id is used + + clear_terminal() + for i in range(len(current_states)): + print(f"Coil #{i} is {current_states[i]}") + for i in range(len(holding_regs)): + print(f"Register #{i} is {holding_regs[i]}") + + time.sleep(1) + +``` + +## License +This library is a fork of the [sfera-labs/pycom-modbus](https://github.com/sfera-labs/pycom-modbus) library. +The source is licensed under GPL v3.0 from the original author Pycom Ltd. Information on the license can be found [here](https://pycom.io/licensing) diff --git a/uModbus/common.py b/uModbus/common.py index d53dee9..a9b9b16 100644 --- a/uModbus/common.py +++ b/uModbus/common.py @@ -1,77 +1,324 @@ -import uModBus.const as Const import struct +import uModBus.const as Const +import uModBus.functions as functions + +class Client: + def __init__(self, default_unit_id): + self._default_unit_id = default_unit_id + pass + + def read_coils(self, starting_addr, coil_qty, *, unit=None): + modbus_pdu = functions.read_coils(starting_addr, coil_qty) + if unit is None: + unit = self._default_unit_id + + response = self._send_receive(unit, modbus_pdu, True) + status_pdu = self._bytes_to_bool(response) + + return status_pdu[:coil_qty] + + def read_discrete_inputs(self, starting_addr, input_qty, *, unit=None): + modbus_pdu = functions.read_discrete_inputs(starting_addr, input_qty) + if unit is None: + unit = self._default_unit_id + + response = self._send_receive(unit, modbus_pdu, True) + status_pdu = self._bytes_to_bool(response) + + return status_pdu[:input_qty] + + def read_holding_registers(self, starting_addr, register_qty, *, unit=None, signed = True): + modbus_pdu = functions.read_holding_registers(starting_addr, register_qty) + if unit is None: + unit = self._default_unit_id + + response = self._send_receive(unit, modbus_pdu, True) + register_value = self._to_short(response, signed) + + return register_value + + def read_input_registers(self, starting_address, register_quantity, *, unit=None, signed = True): + modbus_pdu = functions.read_input_registers(starting_address, register_quantity) + if unit is None: + unit = self._default_unit_id + + response = self._send_receive(unit, modbus_pdu, True) + register_value = self._to_short(response, signed) + + return register_value + + def write_single_coil(self, output_address, output_value, *, unit=None): + modbus_pdu = functions.write_single_coil(output_address, output_value) + if unit is None: + unit = self._default_unit_id + + response = self._send_receive(unit, modbus_pdu, False) + operation_status = functions.validate_resp_data(response, Const.WRITE_SINGLE_COIL, + output_address, value=output_value, signed=False) + + return operation_status + + def write_single_register(self, register_address, register_value, *, unit=None, signed=True): + modbus_pdu = functions.write_single_register(register_address, register_value, signed) + if unit is None: + unit = self._default_unit_id + + + response = self._send_receive(unit, modbus_pdu, False) + operation_status = functions.validate_resp_data(response, Const.WRITE_SINGLE_REGISTER, + register_address, value=register_value, signed=signed) + + return operation_status + + def write_multiple_coils(self, starting_address, output_values, *, unit=None): + modbus_pdu = functions.write_multiple_coils(starting_address, output_values) + if unit is None: + unit = self._default_unit_id + + response = self._send_receive(unit, modbus_pdu, False) + operation_status = functions.validate_resp_data(response, Const.WRITE_MULTIPLE_COILS, + starting_address, quantity=len(output_values)) + + return operation_status + + def write_multiple_registers(self, starting_address, register_values, *, unit=None, signed=True): + modbus_pdu = functions.write_multiple_registers(starting_address, register_values, signed) + if unit is None: + unit = self._default_unit_id + + response = self._send_receive(unit, modbus_pdu, False) + operation_status = functions.validate_resp_data(response, Const.WRITE_MULTIPLE_REGISTERS, + starting_address, quantity=len(register_values)) + + return operation_status + + def _bytes_to_bool(self, byte_list): + bool_list = [] + for index, byte in enumerate(byte_list): + bool_list.extend([bool(byte & (1 << n)) for n in range(8)]) + + return bool_list + + def _to_short(self, byte_array, signed=True): + response_quantity = int(len(byte_array) / 2) + fmt = '>' + (('h' if signed else 'H') * response_quantity) + + return struct.unpack(fmt, byte_array) + +class Server: + def __init__(self, unit_addr=None, *, number_coils=None, number_discrete_inputs=None, + number_input_registers=None, number_holding_registers=None): + self.unit_addr = unit_addr -class Request: - def __init__(self, interface, data): - self._itf = interface - self.unit_addr = data[0] - self.function, self.register_addr = struct.unpack_from('>BH', data, 1) - - if self.function in [Const.READ_COILS, Const.READ_DISCRETE_INPUTS]: - self.quantity = struct.unpack_from('>H', data, 4)[0] - if self.quantity < 0x0001 or self.quantity > 0x07D0: - raise ModbusException(self.function, Const.ILLEGAL_DATA_VALUE) - self.data = None - - elif self.function in [Const.READ_HOLDING_REGISTERS, Const.READ_INPUT_REGISTER]: - self.quantity = struct.unpack_from('>H', data, 4)[0] - if self.quantity < 0x0001 or self.quantity > 0x007D: - raise ModbusException(self.function, Const.ILLEGAL_DATA_VALUE) - self.data = None - - elif self.function == Const.WRITE_SINGLE_COIL: - self.quantity = None - self.data = data[4:6] + if number_coils is not None: + self.coils = [0] * number_coils + + if number_discrete_inputs is not None: + self.discrete_inputs = [0] * number_discrete_inputs + + if number_input_registers is not None: + self.input_registers = _ValueRegisters(number_input_registers) + + if number_holding_registers is not None: + self.holding_registers = _ValueRegisters(number_holding_registers) + + + def handle_request(self, data): + unit_addr = data[0] + if self.unit_addr is not None and self.unit_addr != unit_addr: + print(f"Unit address {unit_addr} does not match {self.unit_addr}") + return + + function_code, address = struct.unpack_from('>BH', data, 1) + + if function_code in [Const.READ_COILS, Const.READ_DISCRETE_INPUTS]: + quantity = struct.unpack_from('>H', data, 4)[0] + if not self._within_limits(function_code, quantity, address): + self.send_exception(function_code, Const.ILLEGAL_DATA_ADDRESS) + return + if function_code == Const.READ_COILS: + data = self.coils[address:address+quantity] + else: + data = self.discrete_inputs[address:address+quantity] + + elif function_code in [Const.READ_HOLDING_REGISTERS, Const.READ_INPUT_REGISTER]: + quantity = struct.unpack_from('>H', data, 4)[0] + if not self._within_limits(function_code, quantity, address): + self.send_exception(function_code, Const.ILLEGAL_DATA_ADDRESS) + return + + if function_code == Const.READ_HOLDING_REGISTERS: + data = self.holding_registers.raw[address:address+quantity] + else: + data = self.input_registers.raw[address:address+quantity] + data = b''.join(data) + + elif function_code == Const.WRITE_SINGLE_COIL: + quantity = None + data = data[4:6] + if not self._within_limits(function_code, quantity, address): + self.send_exception(function_code, Const.ILLEGAL_DATA_ADDRESS) + return # allowed values: 0x0000 or 0xFF00 - if (self.data[0] not in [0x00, 0xFF]) or self.data[1] != 0x00: - raise ModbusException(self.function, Const.ILLEGAL_DATA_VALUE) + if (data[0] not in [0x00, 0xFF]) or data[1] != 0x00: + self.send_exception(function_code, Const.ILLEGAL_DATA_ADDRESS) + return + self.coils[address] = data[0] & 1 - elif self.function == Const.WRITE_SINGLE_REGISTER: - self.quantity = None - self.data = data[4:6] + elif function_code == Const.WRITE_SINGLE_REGISTER: + quantity = None + data = data[4:6] + if not self._within_limits(function_code, quantity, address): + self.send_exception(function_code, Const.ILLEGAL_DATA_ADDRESS) + return + self.holding_registers.raw[address] = data # all values allowed - elif self.function == Const.WRITE_MULTIPLE_COILS: - self.quantity = struct.unpack_from('>H', data, 4)[0] - if self.quantity < 0x0001 or self.quantity > 0x07D0: - raise ModbusException(self.function, Const.ILLEGAL_DATA_VALUE) - self.data = data[7:] - if len(self.data) != ((self.quantity - 1) // 8) + 1: - raise ModbusException(self.function, Const.ILLEGAL_DATA_VALUE) - - elif self.function == Const.WRITE_MULTIPLE_REGISTERS: - self.quantity = struct.unpack_from('>H', data, 4)[0] - if self.quantity < 0x0001 or self.quantity > 0x007B: - raise ModbusException(self.function, Const.ILLEGAL_DATA_VALUE) - self.data = data[7:] - if len(self.data) != self.quantity * 2: - raise ModbusException(self.function, Const.ILLEGAL_DATA_VALUE) + elif function_code == Const.WRITE_MULTIPLE_COILS: + quantity = struct.unpack_from('>H', data, 4)[0] + if not self._within_limits(function_code, quantity, address): + self.send_exception(function_code, Const.ILLEGAL_DATA_ADDRESS) + raise ModbusException(function_code, Const.ILLEGAL_DATA_VALUE, self) + data = data[7:] + if len(data) != ((quantity - 1) // 8) + 1: + self.send_exception(function_code, Const.ILLEGAL_DATA_ADDRESS) + raise ModbusException(function_code, Const.ILLEGAL_DATA_VALUE, self) + self.coils[address:address+quantity] = self.data_as_bits(data, quantity) + + elif function_code == Const.WRITE_MULTIPLE_REGISTERS: + quantity = struct.unpack_from('>H', data, 4)[0] + if not self._within_limits(function_code, quantity, address): + self.send_exception(function_code, Const.ILLEGAL_DATA_ADDRESS) + return + data = data[7:] + if len(data) != quantity * 2: + self.send_exception(function_code, Const.ILLEGAL_DATA_ADDRESS) + raise ModbusException(function_code, Const.ILLEGAL_DATA_VALUE, self) + self.holding_registers.raw[address:address+quantity] = [data[i:i+2] for i in range(0, quantity*2, 2)] else: # Not implemented functions - self.quantity = None - self.data = data[4:] + quantity = None + data = data[4:] + self.send_exception(function_code, Const.ILLEGAL_FUNCTION) + return + + self.send_response(unit_addr, function_code, address, quantity, data, data) + + return (function_code, address, quantity) + + def send_response(self, slave_addr, function_code, request_register_addr, request_register_qty, request_data, values=None, signed=False): + modbus_pdu = functions.response(function_code, request_register_addr, request_register_qty, request_data, values, signed) + self._send(modbus_pdu, slave_addr) + + def send_exception_response(self, slave_addr, function_code, exception_code): + modbus_pdu = functions.exception_response(function_code, exception_code) + self._send(modbus_pdu, slave_addr) - def send_response(self, values=None, signed=True): - self._itf.send_response(self.unit_addr, self.function, self.register_addr, self.quantity, self.data, values, signed) + def send_exception(self, function_code, exception_code): + addr = self.unit_addr + if addr is None: + addr = 255 + self.send_exception_response(addr, function_code, exception_code) - def send_exception(self, exception_code): - self._itf.send_exception_response(self.unit_addr, self.function, exception_code) - def data_as_bits(self): + def data_as_bits(self, data, quantity): bits = [] - for byte in self.data: + for byte in data: for i in range(0, 8): bits.append((byte >> i) & 1) - if len(bits) == self.quantity: + if len(bits) == quantity: return bits - def data_as_registers(self, signed=True): - qty = self.quantity if (self.quantity != None) else 1 - fmt = ('h' if signed else 'H') * qty - return struct.unpack('>' + fmt, self.data) + def _within_limits(self, function_code, quantity, address): + + if function_code == Const.READ_DISCRETE_INPUTS: + object_count = len(self.discrete_inputs) + quantity_max = 0x07D0 + elif function_code in [Const.READ_COILS, Const.WRITE_SINGLE_COIL, Const.WRITE_MULTIPLE_COILS]: + object_count = len(self.coils) + quantity_max = 0x07D0 + elif function_code == Const.READ_INPUT_REGISTER: + object_count = len(self.input_registers) + quantity_max = 0x007D + else: # Holding register + object_count = len(self.holding_registers) + quantity_max = 0x007D + + if quantity is not None and (quantity < 1 or quantity > quantity_max): + return False + + if quantity == None: + quantity = 0 + + if quantity + address > object_count: + return False + else: + return True + class ModbusException(Exception): - def __init__(self, function_code, exception_code): + def __init__(self, function_code, exception_code, instance): + instance.send_exception_response(instance.unit_addr, function_code, exception_code) self.function_code = function_code self.exception_code = exception_code + + +class _ValueRegisters(): + def __init__(self, length): + self.raw = [bytes(2)] * length + self.signed = [False] * length + self.byteswap = [False] * length + + def __len__(self): + return len(self.raw) + + def __setitem__(self, index, value): + if isinstance(index, int): + self._set_value(index, value) + elif isinstance(index, slice): + start = index.start + if start is None: + start = 0 + end = index.stop + if end is None: + end = len(self) + end = min(end, start + len(value)) + + for i in range(start, end): + self._set_value(i, value[i-start]) + + else: + raise TypeError('Index must be an integer or slice') + + + def __getitem__(self, index): + if isinstance(index, int): + return self._get_value(index) + elif isinstance(index, slice): + start = index.start + if start is None: + start = 0 + end = index.stop + if end is None: + end = len(self) + + return [self._get_value(i) for i in range(start, end)] + else: + raise TypeError('Index must be an integer or slice') + + def _set_value(self, index, value): + format = '<' if self.byteswap[index] else '>' + format += 'h' if self.signed[index] else 'H' + + try: + self.raw[index] = struct.pack(format, value) + except OverflowError: + raise OverflowError(f'Address {index} value {value} must be between {((-32768 if self.signed[index] else 0))} and {(32767 if self.signed[index] else 65535)}') + + + def _get_value(self, index): + format = '<' if self.byteswap[index] else '>' + format += 'h' if self.signed[index] else 'H' + + return struct.unpack(format, self.raw[index])[0] diff --git a/uModbus/const.py b/uModbus/const.py index e004508..d6558db 100644 --- a/uModbus/const.py +++ b/uModbus/const.py @@ -7,6 +7,9 @@ # see the Pycom Licence v1.0 document supplied with this file, or # available at https://www.pycom.io/opensource/licensing # +# Modified by FACTS Engineering 2023 +# +# # function codes READ_DISCRETE_INPUTS = 0x02 @@ -51,6 +54,8 @@ FIXED_RESP_LEN = 0x08 MBAP_HDR_LENGTH = 0x07 +MAX_MSG_LENGTH = 253 + CRC16_TABLE = ( 0x0000,0xC0C1,0xC181,0x0140,0xC301,0x03C0,0x0280,0xC241,0xC601, 0x06C0,0x0780,0xC741,0x0500,0xC5C1,0xC481,0x0440,0xCC01,0x0CC0, diff --git a/uModbus/functions.py b/uModbus/functions.py index 9c4e269..d050545 100644 --- a/uModbus/functions.py +++ b/uModbus/functions.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python -# # Copyright (c) 2019, Pycom Limited. # # This software is licensed under the GNU GPL version 3 or any @@ -7,6 +5,8 @@ # see the Pycom Licence v1.0 document supplied with this file, or # available at https://www.pycom.io/opensource/licensing # +# Modified by FACTS Engineering 2023 + import uModBus.const as Const import struct @@ -35,8 +35,8 @@ def read_input_registers(starting_address, quantity): return struct.pack('>BHH', Const.READ_INPUT_REGISTER, starting_address, quantity) def write_single_coil(output_address, output_value): - if output_value not in [0x0000, 0xFF00]: - raise ValueError('Illegal coil value') + if output_value != 0: + output_value = 0xFF00 return struct.pack('>BHH', Const.WRITE_SINGLE_COIL, output_address, output_value) @@ -84,7 +84,7 @@ def validate_resp_data(data, function_code, address, value=None, quantity=None, return False -def response(function_code, request_register_addr, request_register_qty, request_data, value_list=None, signed=True): +def response(function_code, request_register_addr, request_register_qty, request_data, value_list=None, signed=False): if function_code in [Const.READ_COILS, Const.READ_DISCRETE_INPUTS]: sectioned_list = [value_list[i:i + 8] for i in range(0, len(value_list), 8)] @@ -97,19 +97,7 @@ def response(function_code, request_register_addr, request_register_qty, request return struct.pack('>BB' + fmt, function_code, ((len(value_list) - 1) // 8) + 1, *output_value) elif function_code in [Const.READ_HOLDING_REGISTERS, Const.READ_INPUT_REGISTER]: - quantity = len(value_list) - - if not (0x0001 <= quantity <= 0x007D): - raise ValueError('invalid number of registers') - - if signed == True or signed == False: - fmt = ('h' if signed else 'H') * quantity - else: - fmt = '' - for s in signed: - fmt += 'h' if s else 'H' - - return struct.pack('>BB' + fmt, function_code, quantity * 2, *value_list) + return struct.pack('>BB', function_code, len(value_list)) + value_list elif function_code in [Const.WRITE_SINGLE_COIL, Const.WRITE_SINGLE_REGISTER]: return struct.pack('>BHBB', function_code, request_register_addr, *request_data) diff --git a/uModbus/serial.py b/uModbus/serial.py index 49fbb9f..cdf0337 100644 --- a/uModbus/serial.py +++ b/uModbus/serial.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python -# # Copyright (c) 2019, Pycom Limited. # # This software is licensed under the GNU GPL version 3 or any @@ -7,51 +5,82 @@ # see the Pycom Licence v1.0 document supplied with this file, or # available at https://www.pycom.io/opensource/licensing # -import uModBus.functions as functions +# Modified by FACTS Engineering 2023 + +import time +import struct import uModBus.const as Const -from uModBus.common import Request from uModBus.common import ModbusException -from machine import UART -from machine import Pin -import struct -import time -import machine +from uModBus.common import Server, Client + +def _t35chars_time(baudrate, data_bits, stop_bits): + if baudrate <= 19200: + return (3.5 * (data_bits + stop_bits + 2)) / baudrate + else: + return .001750 + +def _rtu_send(ctx, modbus_pdu, slave_addr): + serial_pdu = bytearray() + serial_pdu.append(slave_addr) + serial_pdu.extend(modbus_pdu) -class Serial: + crc = _calculate_crc16(serial_pdu) + serial_pdu.extend(crc) + ctx._uart.write(serial_pdu) + time.sleep(ctx._t35chars) - def __init__(self, uart_id, baudrate=9600, data_bits=8, stop_bits=1, parity=None, pins=None, ctrl_pin=None): - self._uart = UART(uart_id, baudrate=baudrate, bits=data_bits, parity=parity, \ - stop=stop_bits, timeout_chars=2, pins=pins) - if ctrl_pin is not None: - self._ctrlPin = Pin(ctrl_pin, mode=Pin.OUT) - else: - self._ctrlPin = None +def _calculate_crc16(data): + crc = 0xFFFF - if baudrate <= 19200: - self._t35chars = (3500000 * (data_bits + stop_bits + 2)) // baudrate - else: - self._t35chars = 1750 + for char in data: + crc = (crc >> 8) ^ Const.CRC16_TABLE[((crc) ^ char) & 0xFF] - def _calculate_crc16(self, data): - crc = 0xFFFF + return struct.pack('> 8) ^ Const.CRC16_TABLE[((crc) ^ char) & 0xFF] +def _uart_read_frame(ctx, timeout=None): + frame = bytearray() - return struct.pack('= 8: + return frame - def _bytes_to_bool(self, byte_list): - bool_list = [] - for index, byte in enumerate(byte_list): - bool_list.extend([bool(byte & (1 << n)) for n in range(8)]) + return None - return bool_list +def _validate_resp_hdr(response, slave_addr, function_code, count): - def _to_short(self, byte_array, signed=True): - response_quantity = int(len(byte_array) / 2) - fmt = '>' + (('h' if signed else 'H') * response_quantity) + if len(response) == 0: + raise OSError('no data received from slave') - return struct.unpack(fmt, byte_array) + resp_crc = response[-Const.CRC_LENGTH:] + expected_crc = _calculate_crc16(response[:-Const.CRC_LENGTH]) + + if resp_crc != expected_crc: + print(f"Bad CRC - \n\tReceived - {hex(resp_crc[0]) + hex(resp_crc[1])[-2:]} \n\t Expected - {hex(expected_crc[0]) + hex(expected_crc[1])[-2:]}") + raise OSError(f"Response was: {[hex(i) for i in response]}") + + if (response[0] != slave_addr): + raise ValueError('wrong slave address') + + if (response[1] == (function_code + Const.ERROR_BIAS)): + raise ValueError('slave returned exception code: {:d}'.format(response[2])) + + hdr_length = Const.RESPONSE_HDR_LENGTH + int(count) + return response[hdr_length:-Const.CRC_LENGTH] + +class RTUClient(Client): + def __init__(self, uart, *, default_unit_id=0x00, timeout=None, data_bits=8, stop_bits=1): + super().__init__(default_unit_id) + self._uart = uart + self.timeout = timeout + self._t35chars = _t35chars_time(self._uart.baudrate, data_bits, stop_bits) def _exit_read(self, response): if response[1] >= Const.ERROR_BIAS: @@ -68,172 +97,73 @@ def _exit_read(self, response): def _uart_read(self): response = bytearray() - - for x in range(1, 40): - if self._uart.any(): - response.extend(self._uart.readall()) - # variable length function codes may require multiple reads - if self._exit_read(response): - break - time.sleep(0.05) - - return response - - def _uart_read_frame(self, timeout=None): - bytes = bytearray() - - start_ms = time.ticks_ms() - while timeout == None or time.ticks_diff(start_ms, time.ticks_ms()) <= timeout: - last_byte_ts = time.ticks_us() - while time.ticks_diff(last_byte_ts, time.ticks_us()) <= self._t35chars: - r = self._uart.readall() - if r != None: - bytes.extend(r) - last_byte_ts = time.ticks_us() - - if len(bytes) > 0: - return bytes - - return bytes + start = time.monotonic() + while True: + waiting = self._uart.in_waiting + time.sleep(self._t35chars) + if waiting > 0: + while waiting != self._uart.in_waiting: # give timeout period + time.sleep(self._t35chars) + waiting = self._uart.in_waiting + response.extend(self._uart.read(waiting)) + start = time.monotonic() # reset timeout on new data + + if len(response) >= Const.ERROR_RESP_LEN and self._exit_read(response): + return response + + if self.timeout is not None: + if time.monotonic() - start > self.timeout: + return response + + + def _send(self, slave_addr, modbus_pdu): + _rtu_send(self, modbus_pdu, slave_addr) + + def _send_receive(self, slave_addr, modbus_pdu, count): + try: + self._uart.reset_input_buffer() + self._send(slave_addr, modbus_pdu) + resp = self._uart_read() + return _validate_resp_hdr(resp, slave_addr, modbus_pdu[0], count) + except (OSError, ValueError): # retry to help with devices with lax timing + time.sleep(self._t35chars * 2) + self._uart.reset_input_buffer() + self._send(slave_addr, modbus_pdu) + resp = self._uart_read() + return _validate_resp_hdr(resp, slave_addr, modbus_pdu[0], count) + +class RTUServer(Server): + def __init__(self, uart, data_bits=8, stop_bits=1, *, unit_addr=1, number_coils=None, number_discrete_inputs=None, + number_input_registers=None, number_holding_registers=None): + super().__init__( + unit_addr, + number_coils=number_coils, + number_discrete_inputs=number_discrete_inputs, + number_input_registers=number_input_registers, + number_holding_registers=number_holding_registers + ) + + self._uart = uart + self._t35chars = _t35chars_time(self._uart.baudrate, data_bits, stop_bits) def _send(self, modbus_pdu, slave_addr): - serial_pdu = bytearray() - serial_pdu.append(slave_addr) - serial_pdu.extend(modbus_pdu) - - crc = self._calculate_crc16(serial_pdu) - serial_pdu.extend(crc) - - if self._ctrlPin: - self._ctrlPin(1) - self._uart.write(serial_pdu) - if self._ctrlPin: - while not self._uart.wait_tx_done(2): - machine.idle() - time.sleep_us(self._t35chars) - self._ctrlPin(0) - - def _send_receive(self, modbus_pdu, slave_addr, count): - # flush the Rx FIFO - self._uart.read() - self._send(modbus_pdu, slave_addr) - return self._validate_resp_hdr(self._uart_read(), slave_addr, modbus_pdu[0], count) - - def _validate_resp_hdr(self, response, slave_addr, function_code, count): - - if len(response) == 0: - raise OSError('no data received from slave') - - resp_crc = response[-Const.CRC_LENGTH:] - expected_crc = self._calculate_crc16(response[0:len(response) - Const.CRC_LENGTH]) - if (resp_crc[0] != expected_crc[0]) or (resp_crc[1] != expected_crc[1]): - raise OSError('invalid response CRC') - - if (response[0] != slave_addr): - raise ValueError('wrong slave address') - - if (response[1] == (function_code + Const.ERROR_BIAS)): - raise ValueError('slave returned exception code: {:d}'.format(response[2])) - - hdr_length = (Const.RESPONSE_HDR_LENGTH + 1) if count else Const.RESPONSE_HDR_LENGTH - - return response[hdr_length : len(response) - Const.CRC_LENGTH] + _rtu_send(self, modbus_pdu, slave_addr) - def read_coils(self, slave_addr, starting_addr, coil_qty): - modbus_pdu = functions.read_coils(starting_addr, coil_qty) - - resp_data = self._send_receive(modbus_pdu, slave_addr, True) - status_pdu = self._bytes_to_bool(resp_data) - - return status_pdu - - def read_discrete_inputs(self, slave_addr, starting_addr, input_qty): - modbus_pdu = functions.read_discrete_inputs(starting_addr, input_qty) - - resp_data = self._send_receive(modbus_pdu, slave_addr, True) - status_pdu = self._bytes_to_bool(resp_data) - - return status_pdu - - def read_holding_registers(self, slave_addr, starting_addr, register_qty, signed=True): - modbus_pdu = functions.read_holding_registers(starting_addr, register_qty) - - resp_data = self._send_receive(modbus_pdu, slave_addr, True) - register_value = self._to_short(resp_data, signed) - - return register_value - - def read_input_registers(self, slave_addr, starting_address, register_quantity, signed=True): - modbus_pdu = functions.read_input_registers(starting_address, register_quantity) - - resp_data = self._send_receive(modbus_pdu, slave_addr, True) - register_value = self._to_short(resp_data, signed) - - return register_value - - def write_single_coil(self, slave_addr, output_address, output_value): - modbus_pdu = functions.write_single_coil(output_address, output_value) - - resp_data = self._send_receive(modbus_pdu, slave_addr, False) - operation_status = functions.validate_resp_data(resp_data, Const.WRITE_SINGLE_COIL, - output_address, value=output_value, signed=False) - - return operation_status - - def write_single_register(self, slave_addr, register_address, register_value, signed=True): - modbus_pdu = functions.write_single_register(register_address, register_value, signed) - - resp_data = self._send_receive(modbus_pdu, slave_addr, False) - operation_status = functions.validate_resp_data(resp_data, Const.WRITE_SINGLE_REGISTER, - register_address, value=register_value, signed=signed) - - return operation_status - - def write_multiple_coils(self, slave_addr, starting_address, output_values): - modbus_pdu = functions.write_multiple_coils(starting_address, output_values) - - resp_data = self._send_receive(modbus_pdu, slave_addr, False) - operation_status = functions.validate_resp_data(resp_data, Const.WRITE_MULTIPLE_COILS, - starting_address, quantity=len(output_values)) - - return operation_status - - def write_multiple_registers(self, slave_addr, starting_address, register_values, signed=True): - modbus_pdu = functions.write_multiple_registers(starting_address, register_values, signed) - - resp_data = self._send_receive(modbus_pdu, slave_addr, False) - operation_status = functions.validate_resp_data(resp_data, Const.WRITE_MULTIPLE_REGISTERS, - starting_address, quantity=len(register_values)) - - return operation_status - - def send_response(self, slave_addr, function_code, request_register_addr, request_register_qty, request_data, values=None, signed=True): - modbus_pdu = functions.response(function_code, request_register_addr, request_register_qty, request_data, values, signed) - self._send(modbus_pdu, slave_addr) - - def send_exception_response(self, slave_addr, function_code, exception_code): - modbus_pdu = functions.exception_response(function_code, exception_code) - self._send(modbus_pdu, slave_addr) - - def get_request(self, unit_addr_list, timeout=None): - req = self._uart_read_frame(timeout) - - if len(req) < 8: + def poll(self, timeout=None): + req = _uart_read_frame(self, timeout) + if req is None or len(req) < 8: return None - - if req[0] not in unit_addr_list: - return None - req_crc = req[-Const.CRC_LENGTH:] req_no_crc = req[:-Const.CRC_LENGTH] - expected_crc = self._calculate_crc16(req_no_crc) + expected_crc = _calculate_crc16(req_no_crc) if (req_crc[0] != expected_crc[0]) or (req_crc[1] != expected_crc[1]): + print(f"Bad CRC - \n\tReceived - {hex(req_crc[0]) + hex(req_crc[1])[-2:]} \n\t Expected - {hex(expected_crc[0]) + hex(expected_crc[1])[-2:]}") return None try: - request = Request(self, req_no_crc) + return self.handle_request(req_no_crc) except ModbusException as e: self.send_exception_response(req[0], e.function_code, e.exception_code) + print(f"Modbus Exception - {e.function_code} - {e.exception_code}") return None - return request diff --git a/uModbus/tcp.py b/uModbus/tcp.py index cdb94ec..7efcc3c 100644 --- a/uModbus/tcp.py +++ b/uModbus/tcp.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python -# # Copyright (c) 2019, Pycom Limited. # # This software is licensed under the GNU GPL version 3 or any @@ -7,43 +5,46 @@ # see the Pycom Licence v1.0 document supplied with this file, or # available at https://www.pycom.io/opensource/licensing # -import uModBus.functions as functions +# Modified by FACTS Engineering 2023 + +import time +import struct +from random import getrandbits import uModBus.const as Const -from uModBus.common import Request +from uModBus.common import Server, Client from uModBus.common import ModbusException -import struct -import socket -import machine -import time -class TCP: - def __init__(self, slave_ip, slave_port=502, timeout=5): +class TCPClient(Client): + + def __init__(self, socket, server_ip, *, server_port=502, default_unit_id=255, timeout=5): + super().__init__(default_unit_id) self._sock = socket.socket() - self._sock.connect(socket.getaddrinfo(slave_ip, slave_port)[0][-1]) + self._addrinfo = socket.getaddrinfo(server_ip, server_port)[0][-1] + self.connect() self._sock.settimeout(timeout) + def connect(self): + """Connect to Server""" + self._sock.connect(self._addrinfo) + + def disconnect(self): + """Disconnect from server""" + self._sock.disconnect() + + @property + def connected(self): + """Return if socket is connected to the server""" + return self._sock._connected + def _create_mbap_hdr(self, slave_id, modbus_pdu): - trans_id = machine.rng() & 0xFFFF + trans_id = getrandbits(16) & 0xFFFF mbap_hdr = struct.pack('>HHHB', trans_id, 0, len(modbus_pdu) + 1, slave_id) return mbap_hdr, trans_id - def _bytes_to_bool(self, byte_list): - bool_list = [] - for index, byte in enumerate(byte_list): - bool_list.extend([bool(byte & (1 << n)) for n in range(8)]) - - return bool_list - - def _to_short(self, byte_array, signed=True): - response_quantity = int(len(byte_array) / 2) - fmt = '>' + (('h' if signed else 'H') * response_quantity) - - return struct.unpack(fmt, byte_array) - def _validate_resp_hdr(self, response, trans_id, slave_id, function_code, count=False): - rec_tid, rec_pid, rec_len, rec_uid, rec_fc = struct.unpack('>HHHBB', response[:Const.MBAP_HDR_LENGTH + 1]) + rec_tid, rec_pid, rec_len, rec_uid, rec_fc, rec_ec = struct.unpack('>HHHBBB', response[:Const.MBAP_HDR_LENGTH + 2]) if (trans_id != rec_tid): raise ValueError('wrong transaction Id') @@ -54,7 +55,7 @@ def _validate_resp_hdr(self, response, trans_id, slave_id, function_code, count= raise ValueError('wrong slave Id') if (rec_fc == (function_code + Const.ERROR_BIAS)): - raise ValueError('slave returned exception code: {:d}'.format(rec_fc)) + raise ValueError('slave returned exception code: {:d}'.format(rec_ec)) hdr_length = (Const.MBAP_HDR_LENGTH + 2) if count else (Const.MBAP_HDR_LENGTH + 1) @@ -64,173 +65,96 @@ def _send_receive(self, slave_id, modbus_pdu, count): mbap_hdr, trans_id = self._create_mbap_hdr(slave_id, modbus_pdu) self._sock.send(mbap_hdr + modbus_pdu) - response = self._sock.recv(256) - modbus_data = self._validate_resp_hdr(response, trans_id, slave_id, modbus_pdu[0], count) - - return modbus_data - - def read_coils(self, slave_addr, starting_addr, coil_qty): - modbus_pdu = functions.read_coils(starting_addr, coil_qty) - - response = self._send_receive(slave_addr, modbus_pdu, True) - status_pdu = self._bytes_to_bool(response) - - return status_pdu - - def read_discrete_inputs(self, slave_addr, starting_addr, input_qty): - modbus_pdu = functions.read_discrete_inputs(starting_addr, input_qty) - - response = self._send_receive(slave_addr, modbus_pdu, True) - status_pdu = self._bytes_to_bool(response) - - return status_pdu - - def read_holding_registers(self, slave_addr, starting_addr, register_qty, signed = True): - modbus_pdu = functions.read_holding_registers(starting_addr, register_qty) - - response = self._send_receive(slave_addr, modbus_pdu, True) - register_value = self._to_short(response, signed) + timeout = self._sock.gettimeout() + stamp = time.monotonic() - return register_value + while self._sock._available() < Const.MBAP_HDR_LENGTH and time.monotonic() - stamp < timeout: + pass - def read_input_registers(self, slave_addr, starting_address, register_quantity, signed = True): - modbus_pdu = functions.read_input_registers(starting_address, register_quantity) - - response = self._send_receive(slave_addr, modbus_pdu, True) - register_value = self._to_short(response, signed) - - return register_value - - def write_single_coil(self, slave_addr, output_address, output_value): - modbus_pdu = functions.write_single_coil(output_address, output_value) - - response = self._send_receive(slave_addr, modbus_pdu, False) - operation_status = functions.validate_resp_data(response, Const.WRITE_SINGLE_COIL, - output_address, value=output_value, signed=False) - - return operation_status - - def write_single_register(self, slave_addr, register_address, register_value, signed=True): - modbus_pdu = functions.write_single_register(register_address, register_value, signed) - - response = self._send_receive(slave_addr, modbus_pdu, False) - operation_status = functions.validate_resp_data(response, Const.WRITE_SINGLE_REGISTER, - register_address, value=register_value, signed=signed) - - return operation_status - - def write_multiple_coils(self, slave_addr, starting_address, output_values): - modbus_pdu = functions.write_multiple_coils(starting_address, output_values) - - response = self._send_receive(slave_addr, modbus_pdu, False) - operation_status = functions.validate_resp_data(response, Const.WRITE_MULTIPLE_COILS, - starting_address, quantity=len(output_values)) - - return operation_status - - def write_multiple_registers(self, slave_addr, starting_address, register_values, signed=True): - modbus_pdu = functions.write_multiple_registers(starting_address, register_values, signed) + response = self._sock.recv(Const.MAX_MSG_LENGTH) + if len(response) == 0: + raise TimeoutError("No response received") + modbus_data = self._validate_resp_hdr(response, trans_id, slave_id, modbus_pdu[0], count) - response = self._send_receive(slave_addr, modbus_pdu, False) - operation_status = functions.validate_resp_data(response, Const.WRITE_MULTIPLE_REGISTERS, - starting_address, quantity=len(register_values)) + return modbus_data - return operation_status -class TCPServer: +class TCPServer(Server): - def __init__(self): + def __init__(self, socket, local_ip, *, local_port=502, unit_addr=None, number_coils=None, number_discrete_inputs=None, + number_input_registers=None, number_holding_registers=None): + super().__init__( + unit_addr, + number_coils=number_coils, + number_discrete_inputs=number_discrete_inputs, + number_input_registers=number_input_registers, + number_holding_registers=number_holding_registers + ) self._sock = None self._client_sock = None + self._socket_source = socket + self._local_ip = local_ip + self._local_port = local_port - def bind(self, local_ip, local_port=502): - if self._client_sock: - self._client_sock.close() - if self._sock: - self._sock.close() - self._sock = socket.socket() - self._sock.bind(socket.getaddrinfo(local_ip, local_port)[0][-1]) + + def _listen(self): + self._sock.bind((self._local_ip, self._local_port)) self._sock.listen() + def _send(self, modbus_pdu, slave_addr): size = len(modbus_pdu) fmt = 'B' * size - adu = struct.pack('>HHHB' + fmt, self._req_tid, 0, size + 1, slave_addr, *modbus_pdu) - self._client_sock.send(adu) + adu = struct.pack('>HHHB', self._req_tid, 0, size + 1, slave_addr) + modbus_pdu + try: + self._client_sock.send(adu) + except Exception as e: + self._client_sock.close() + self._client_sock = None + raise e - def send_response(self, slave_addr, function_code, request_register_addr, request_register_qty, request_data, values=None, signed=True): - modbus_pdu = functions.response(function_code, request_register_addr, request_register_qty, request_data, values, signed) - self._send(modbus_pdu, slave_addr) - def send_exception_response(self, slave_addr, function_code, exception_code): - modbus_pdu = functions.exception_response(function_code, exception_code) - self._send(modbus_pdu, slave_addr) + def _accept_request(self, accept_timeout): - def _accept_request(self, accept_timeout, unit_addr_list): + start = time.monotonic() self._sock.settimeout(accept_timeout) - new_client_sock = None - try: - new_client_sock, client_address = self._sock.accept() - except OSError as e: - if e.args[0] != 11: # 11 = timeout expired - raise e - - if new_client_sock != None: - if self._client_sock != None: - self._client_sock.close() - self._client_sock = new_client_sock - self._client_sock.settimeout(0) # recv() timeout - - if self._client_sock != None: - try: - req = self._client_sock.recv(128) - if len(req) == 0: - return None - - req_header_no_uid = req[:Const.MBAP_HDR_LENGTH - 1] - self._req_tid, req_pid, req_len = struct.unpack('>HHH', req_header_no_uid) - req_uid_and_pdu = req[Const.MBAP_HDR_LENGTH - 1:Const.MBAP_HDR_LENGTH + req_len - 1] - + while time.monotonic() - start < accept_timeout and (self._client_sock == None or self._client_sock._socket_closed == True): + try: + self._client_sock, addr = self._sock.accept() except TimeoutError: - return None - except Exception as e: - print("Modbus request error:", e) - self._client_sock.close() - self._client_sock = None - return None - - if (req_pid != 0): - print("Modbus request error: PID not 0") - self._client_sock.close() - self._client_sock = None - return None - - if unit_addr_list != None and req_uid_and_pdu[0] not in unit_addr_list: - return None - - try: - return Request(self, req_uid_and_pdu) - except ModbusException as e: - self.send_exception_response(req[0], e.function_code, e.exception_code) - return None - - def get_request(self, unit_addr_list=None, timeout=None): - if self._sock == None: - raise Exception('Modbus TCP server not bound') - - if timeout > 0: - start_ms = time.ticks_ms() - elapsed = 0 - while True: - if self._client_sock == None: - accept_timeout = None if timeout == None else (timeout - elapsed) / 1000 - else: - accept_timeout = 0 - req = self._accept_request(accept_timeout, unit_addr_list) - if req: - return req - elapsed = time.ticks_diff(start_ms, time.ticks_ms()) - if elapsed > timeout: - return None - else: - return self._accept_request(0, unit_addr_list) + pass + if self._client_sock == None: + return None + self._client_sock.settimeout(.000001) + while time.monotonic() - start < accept_timeout: + try: + if self._client_sock._available() >= Const.MBAP_HDR_LENGTH: + break + except TimeoutError: + pass + if self._client_sock._available() < Const.MBAP_HDR_LENGTH: + return None + req = self._client_sock.recv(Const.MAX_MSG_LENGTH) + + req_header_no_uid = req[:Const.MBAP_HDR_LENGTH - 1] + self._req_tid, req_pid, req_len = struct.unpack('>HHH', req_header_no_uid) + req_uid_and_pdu = req[Const.MBAP_HDR_LENGTH - 1:Const.MBAP_HDR_LENGTH + req_len - 1] + if (req_pid != 0): + self._client_sock.close() + self._client_sock = None + return None + try: + r = self.handle_request(req_uid_and_pdu) + return r + except ModbusException as e: + # print("Modbus request error:", e) + self.send_exception_response(req[0], e.function_code, e.exception_code) + return None + + def poll(self, timeout=.000001): + if self._sock == None or self._sock._socket_closed == True: + self._sock = self._socket_source.socket() + self._listen() + self._sock.settimeout(.1) + + return self._accept_request(timeout)