Skip to content

Fix modbus serial #1675

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 41 additions & 21 deletions thingsboard_gateway/connectors/modbus/modbus_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

# Try import Pymodbus library or install it and import
installation_required = False
required_version = '3.0.0'
required_version = '3.0.2'
force_install = False

try:
Expand Down Expand Up @@ -160,7 +160,7 @@ def run(self):
try:
self.__log.info("Starting Modbus server")
self.__run_server()
self.__add_slave(self.__server.get_slave_config_format())
self.__server.slave = self.__add_slave(self.__server.get_slave_config_format())
self.__log.info("Modbus server started")
except Exception as e:
self.__log.exception('Failed to start Modbus server: %s', e)
Expand All @@ -177,7 +177,7 @@ def run(self):
self.__log.exception(e)

def __run_server(self):
self.__server = Server(self.__config['slave'], self.__log)
self.__server = Server(self, self.__config['slave'], self.__log)
self.__server.start()

def __get_master(self, slave: Slave):
Expand All @@ -204,10 +204,13 @@ def __get_master(self, slave: Slave):

def __add_slave(self, slave_config):
slave = Slave(self, self.__log, slave_config)
master = self.__get_master(slave)
slave.master = master
if slave.no_master:
master = self.__get_master(slave)
slave.master = master

self.__slaves.append(slave)

return slave

def __add_slaves(self, slaves_config):
for slave_config in slaves_config:
Expand All @@ -217,7 +220,7 @@ def __add_slaves(self, slaves_config):
self.__log.exception('Failed to add slave: %s', e)

self.__log.debug('Added %d slaves', len(self.__slaves))

@classmethod
def callback(cls, slave: Slave, queue: Queue):
queue.put_nowait(slave)
Expand All @@ -226,26 +229,28 @@ async def __process_requests(self):
while not self.__stopped:
try:
slave = self.process_device_requests.get_nowait()
await self.__poll_device(slave)
await self.__process_device(slave)
except Empty:
await asyncio.sleep(.01)
except Exception as e:
self.__log.exception('Failed to poll device: %s', e)

async def __poll_device(self, slave: Slave):
async def __process_device(self, slave: Slave):
self.__log.debug("Polling %s slave", slave)

# check if device have attributes or telemetry to poll
if slave.uplink_converter_config.attributes or slave.uplink_converter_config.telemetry:
try:
connected_to_master = await slave.connect()

if slave.no_master:
connected_to_master = True
else:
connected_to_master = await slave.connect()

if connected_to_master:
self.__manage_device_connectivity_to_platform(slave)

if connected_to_master:
slave_data = await self.__read_slave_data(slave)
self.__data_to_convert.put_nowait((slave, slave_data))
if slave_data:
self.__data_to_convert.put_nowait((slave, slave_data))
else:
self.__log.error('Socket is closed, connection is lost, for device %s', slave)
self.__delete_device_from_platform(slave)
Expand All @@ -270,7 +275,14 @@ async def __read_slave_data(self, slave: Slave):
for config_section in ('attributes', 'telemetry'):
for config in getattr(slave.uplink_converter_config, config_section):
try:
response = await slave.read(config['functionCode'], config['address'], config['objectsCount'])
if slave.no_master:
if config['address'] in self.__server.addresses_updated:
response = self.__server.read(config['functionCode'], config['address'], config['objectsCount'])
else:
continue
else:
response = await slave.read(config['functionCode'], config['address'], config['objectsCount'])

except asyncio.exceptions.TimeoutError:
self.__log.error("Timeout error for device %s function code %s address %s",
slave.device_name, config['functionCode'], config[ADDRESS_PARAMETER])
Expand Down Expand Up @@ -319,12 +331,20 @@ def __convert_data(self):
batch_to_convert[batch_key] = []

batch_to_convert[batch_key].append(data)

for (device_name, uplink_converter), data in batch_to_convert.items():
converted_data: ConvertedData = uplink_converter.convert({}, data)
self.__log.trace("Converted data: %r", converted_data)
if len(converted_data['attributes']) or len(converted_data['telemetry']):
self.__data_to_save.put_nowait(converted_data)

self.__log.trace("Data read from %s slave: %s", slave, data)

if slave.no_master:
converted_data = ConvertedData(slave.device_name, slave.device_type)
converted_data.add_to_attributes(data['attributes'])
converted_data.add_to_telemetry(data['telemetry'])
else:
for (device_name, uplink_converter), data in batch_to_convert.items():
converted_data: ConvertedData = uplink_converter.convert({}, data)
self.__log.trace("Converted data: %r", converted_data)

if len(converted_data['attributes']) or len(converted_data['telemetry']):
self.__data_to_save.put_nowait(converted_data)
else:
sleep(.001)
except Exception as e:
Expand All @@ -334,7 +354,7 @@ def __save_data(self):
while not self.__stopped:
if not self.__data_to_save.empty():
try:
converted_data = self.__data_to_save.get_nowait()
converted_data: ConvertedData = self.__data_to_save.get_nowait()
StatisticsService.count_connector_message(self.get_name(), stat_parameter_name='storageMsgPushed')
self.__gateway.send_to_storage(self.get_name(), self.get_id(), converted_data)
self.statistics[STATISTIC_MESSAGE_SENT_PARAMETER] += 1
Expand Down
68 changes: 60 additions & 8 deletions thingsboard_gateway/connectors/modbus/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import TYPE_CHECKING, Dict, Union

import asyncio
from asyncio import CancelledError
from threading import Thread
Expand All @@ -31,6 +33,11 @@
from thingsboard_gateway.connectors.modbus.constants import ADDRESS_PARAMETER, BYTE_ORDER_PARAMETER, FUNCTION_CODE_SLAVE_INITIALIZATION, FUNCTION_TYPE, \
FUNCTION_CODE_READ, HOST_PARAMETER, IDENTITY_SECTION, METHOD_PARAMETER, OBJECTS_COUNT_PARAMETER, PORT_PARAMETER, REPACK_PARAMETER, SERIAL_CONNECTION_TYPE_PARAMETER, TAG_PARAMETER, WORD_ORDER_PARAMETER
from thingsboard_gateway.gateway.constants import DEVICE_NAME_PARAMETER, TYPE_PARAMETER
from thingsboard_gateway.gateway.statistics.statistics_service import StatisticsService

if TYPE_CHECKING:
from thingsboard_gateway.connectors.modbus.modbus_connector import ModbusConnector
from thingsboard_gateway.connectors.modbus.modbus_connector import Slave

SLAVE_TYPE = {
'tcp': StartAsyncTcpServer,
Expand All @@ -47,26 +54,40 @@


class Server(Thread):
def __init__(self, config, logger):
class CallbackDatablock(ModbusSparseDataBlock):
def __init__(self, values, callback):
self.callback = callback
super().__init__(values)

def setValues(self, address, value):
self.callback(address, len(value))
super().setValues(address, value)

def __init__(self, connector: 'ModbusConnector', config, logger):
super().__init__()
self.__stopped = False
self.daemon = True
self.name = 'Gateway Modbus Server (Slave)'


self.connector = connector
self.__log = logger

self.__config = config

self.connector = connector
self.device_name = config.get('deviceName', 'Modbus Slave')
self.device_type = config.get('deviceType', 'default')
self.poll_period = config.get('pollPeriod', 5000)

self.__type = config.get('type', 'tcp').lower()
self.__identity = self.__get_identity(self.__config)
self.__server_context = self.__get_server_context(self.__config)
self.__server_context, self.__datablock = self.__get_server_context(self.__config)
self.__connection_config = self.__get_connection_config(self.__config)
self.__log.trace("Connection config loaded: %s", self.__connection_config)

self.no_master = False
self.__server = None
self.slave: Slave = None
self.addresses_updated = set()

try:
self.loop = asyncio.new_event_loop()
Expand All @@ -76,6 +97,31 @@ def __init__(self, config, logger):

def __str__(self):
return self.name

def __callback(self, address, count):
if self.slave:
for n in range(count):
self.addresses_updated.add(address - 1 + n)
self.__log.trace("Updated addresses: %s", self.addresses_updated)
try:
self.slave.connector.callback(self.slave, self.slave.connector.process_device_requests)
except Exception as e:
self.__log.exception('Error sending slave callback from Server: %s', e)

def read(self, function_code, address, objects_count):
self.__log.debug('Reading %s registers from address %s with function code %s', objects_count, address,
function_code)

result, = self.__datablock.getValues(function_code, address, objects_count)

self.addresses_updated.remove(address)

StatisticsService.count_connector_message(self.connector.get_name(),
stat_parameter_name='connectorMsgsReceived')
StatisticsService.count_connector_bytes(self.connector.get_name(), result,
stat_parameter_name='connectorBytesReceived')

return result

def run(self):
try:
Expand Down Expand Up @@ -109,6 +155,9 @@ async def start_server(self):
self.__server = await SLAVE_TYPE[self.__type](identity=self.__identity, context=self.__server_context,
**self.__connection_config, defer_start=True,
allow_reuse_address=True, allow_reuse_port=True)

if self.__config[TYPE_PARAMETER] == SERIAL_CONNECTION_TYPE_PARAMETER:
await self.__server.start()
await self.__server.serve_forever()
except Exception as e:
self.__stopped = True
Expand All @@ -123,7 +172,8 @@ def get_slave_config_format(self):
**self.__config,
'deviceName': self.device_name,
'deviceType': self.device_type,
'pollPeriod': self.poll_period
'pollPeriod': self.poll_period,
'no_master': True # shows that this slave don't have a master
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like it always be 'no_master': True. Because you at first unpack config and then overwrite no_master it with True.

Also are you sure that we actually need no_master parameter in general? Because if connector can reads data from data blocks - master will be no longer required

Copy link
Author

@palandri palandri Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think you are right in this regard. I tried to modify as little as possible of the Slave class, to not break stuff, so I created the 'no_master' parameter to keep the current implementantion of the Slave class working in polling mode. But it isnt a parameter of the Server class, but Slave class, initialized here. When get_slave_config_format is called, it creates a Slave object (inside Server context), with no_master = True. If the Slave object is called outside of Server context, no_master do not exists, so it defaults to False.

With the gateway working as a Client, it makes sense to poll Servers (which is triggered by the Slave-class object).
On the other hand, with the gateway working as a Server, polling doesn't make sense anymore (as the Server need to react to when data is sent to it).

So, 'no_master' simply control if Slave' internal timer is to be activated or not.

I think a better implementation of it would be to change Slave class to initialize the timer based on poll_period parameter (for example, if poll_period = 0, do not poll).

}

for (register, register_values) in self.__config.get('values', {}).items():
Expand Down Expand Up @@ -179,7 +229,7 @@ def __get_server_context(self, config):
blocks = {}
if (config.get('values') is None) or (not len(config.get('values'))):
self.__log.error("No values to read from device %s", config.get(DEVICE_NAME_PARAMETER, 'Modbus Slave'))
return
return None, None

for (key, value) in config.get('values').items():
values = {}
Expand Down Expand Up @@ -210,13 +260,15 @@ def __get_server_context(self, config):
except Exception as e:
self.__log.error("Failed to configure value %s with error: %s, skipping...", item['value'], e)


try:
if len(values):
blocks[FUNCTION_TYPE[key]] = ModbusSparseDataBlock(values)
blocks[FUNCTION_TYPE[key]] = self.CallbackDatablock(values, callback=self.__callback)
except Exception as e:
self.__log.error("Failed to configure block %s with error: %s", key, e)

if not len(blocks):
self.__log.info("%s - will be initialized without values", config.get(DEVICE_NAME_PARAMETER, 'Modbus Slave'))

return ModbusServerContext(slaves=ModbusSlaveContext(**blocks), single=True)
datablock = ModbusSlaveContext(**blocks)
return ModbusServerContext(slaves=datablock, single=True), datablock
4 changes: 3 additions & 1 deletion thingsboard_gateway/connectors/modbus/slave.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(self, connector: 'ModbusConnector', logger, config):

self.callback = connector.callback

self.no_master = config.get('no_master', False)
self.unit_id = config[UNIT_ID_PARAMETER]
self.host = config.get(HOST_PARAMETER)
self.port = config[PORT_PARAMETER]
Expand Down Expand Up @@ -110,7 +111,8 @@ def __init__(self, connector: 'ModbusConnector', logger, config):
for attr_config in self.attributes_updates_config:
self.shared_attributes_keys.append(attr_config[TAG_PARAMETER])

self.start()
if not self.no_master:
self.start()

def __timer(self):
self.__send_callback()
Expand Down