Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for modbus over serial (RTU) #41

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions modbus4mqtt/modbus4mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def __init__(self, hostname, port, username, password, config_file, mqtt_topic_p
self.prefix = mqtt_topic_prefix
self.address_offset = self.config.get('address_offset', 0)
self.registers = self.config['registers']
self.default_slave = self.config.get('slave', 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for making these safer - I like it

for register in self.registers:
register['address'] += self.address_offset
self.modbus_connect_retries = -1 # Retry forever by default
Expand All @@ -47,12 +48,14 @@ def connect_modbus(self):
else:
word_order = modbus_interface.WordOrder.HighLow

self._mb = modbus_interface.modbus_interface(self.config['ip'],
self._mb = modbus_interface.modbus_interface(self.config.get('ip', None),
self.config.get('port', 502),
self.config.get('update_rate', 5),
variant=self.config.get('variant', None),
scan_batching=self.config.get('scan_batching', None),
word_order=word_order)
word_order=word_order,
baudrate=self.config.get('baudrate', 19200),
method=self.config.get('method', 'rtu'))
failed_attempts = 1
while self._mb.connect():
logging.warning("Modbus connection attempt {} failed. Retrying...".format(failed_attempts))
Expand All @@ -65,7 +68,8 @@ def connect_modbus(self):
sleep(self.modbus_reconnect_sleep_interval)
# Tells the modbus interface about the registers we consider interesting.
for register in self.registers:
self._mb.add_monitor_register(register.get('table', 'holding'), register['address'], register.get('type', 'uint16'))
slave = register.get('slave', self.default_slave)
self._mb.add_monitor_register(register.get('table', 'holding'), slave, register['address'], register.get('type', 'uint16'))
register['value'] = None

def modbus_connection_failed(self):
Expand Down Expand Up @@ -103,6 +107,7 @@ def poll(self):
for register in self._get_registers_with('pub_topic'):
try:
value = self._mb.get_value( register.get('table', 'holding'),
register.get('slave', self.default_slave),
register['address'],
register.get('type', 'uint16'))
except Exception:
Expand Down Expand Up @@ -195,7 +200,8 @@ def _on_message(self, client, userdata, msg):
"Bad/missing value_map? Topic: {}, Value: {}".format(topic, value))
continue
type = register.get('type', 'uint16')
self._mb.set_value(register.get('table', 'holding'), register['address'], int(value),
slave = register.get('slave', self.default_slave)
self._mb.set_value(register.get('table', 'holding'), slave, register['address'], int(value),
register.get('mask', 0xFFFF), type)

# This throws ValueError exceptions if the imported registers are invalid
Expand Down
84 changes: 46 additions & 38 deletions modbus4mqtt/modbus_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
from enum import Enum
import logging
from queue import Queue
from pymodbus.client.sync import ModbusTcpClient, ModbusSocketFramer
from pymodbus.client.sync import ModbusSerialClient, ModbusTcpClient, ModbusSocketFramer
from pymodbus import exceptions
from SungrowModbusTcpClient import SungrowModbusTcpClient
from collections import defaultdict

DEFAULT_SCAN_RATE_S = 5
DEFAULT_SCAN_BATCHING = 100
Expand All @@ -20,15 +21,17 @@ class WordOrder(Enum):

class modbus_interface():

def __init__(self, ip, port=502, update_rate_s=DEFAULT_SCAN_RATE_S, variant=None, scan_batching=None, word_order=WordOrder.HighLow):
def __init__(self, ip=None, port=502, update_rate_s=DEFAULT_SCAN_RATE_S, variant=None, scan_batching=None, word_order=WordOrder.HighLow, method='rtu', baudrate=9600):
self._ip = ip
self._port = port
self._method = method
self._baudrate = baudrate
# This is a dict of sets. Each key represents one table of modbus registers.
# At the moment it has 'input' and 'holding'
self._tables = {'input': set(), 'holding': set()}
self._tables = defaultdict(lambda: {'input': set(), 'holding': set()})

# This is a dicts of dicts. These hold the current values of the interesting registers
self._values = {'input': {}, 'holding': {}}
self._values = defaultdict(lambda: {'input': {}, 'holding': {}})

self._planned_writes = Queue()
self._writing = False
Expand All @@ -54,59 +57,64 @@ def connect(self):
self._mb = SungrowModbusTcpClient.SungrowModbusTcpClient(host=self._ip, port=self._port,
framer=ModbusSocketFramer, timeout=1,
RetryOnEmpty=True, retries=1)
elif self._variant == 'serial':
self._mb = ModbusSerialClient(method=self._method, port=self._port, baudrate=self._baudrate,
bytesize=8, parity='E', stopbits=1,
timeout=1, retries=1)
else:
self._mb = ModbusTcpClient(self._ip, self._port,
framer=ModbusSocketFramer, timeout=1,
RetryOnEmpty=True, retries=1)

def add_monitor_register(self, table, addr, type='uint16'):
def add_monitor_register(self, table, slave, addr, type='uint16'):
# Accepts a modbus register and table to monitor
if table not in self._tables:
if table not in self._tables[slave]:
raise ValueError("Unsupported table type. Please only use: {}".format(self._tables.keys()))
# Register enough sequential addresses to fill the size of the register type.
# Note: Each address provides 2 bytes of data.
for i in range(type_length(type)):
self._tables[table].add(addr+i)
self._tables[slave][table].add(addr+i)

def poll(self):
# Polls for the values marked as interesting in self._tables.
for table in self._tables:
# This batches up modbus reads in chunks of self._scan_batching
start = -1
for k in sorted(self._tables[table]):
group = int(k) - int(k) % self._scan_batching
if (start < group):
try:
values = self._scan_value_range(table, group, self._scan_batching)
for x in range(0, self._scan_batching):
key = group + x
self._values[table][key] = values[x]
# Avoid back-to-back read operations that could overwhelm some modbus devices.
sleep(DEFAULT_READ_SLEEP_S)
except ValueError as e:
logging.exception("{}".format(e))
start = group + self._scan_batching-1
for slave in self._tables:
for table in self._tables[slave]:
# This batches up modbus reads in chunks of self._scan_batching
start = -1
for k in sorted(self._tables[slave][table]):
group = int(k) - int(k) % self._scan_batching
if (start < group):
try:
values = self._scan_value_range(table, slave, group, self._scan_batching)
for x in range(0, self._scan_batching):
key = group + x
self._values[slave][table][key] = values[x]
# Avoid back-to-back read operations that could overwhelm some modbus devices.
sleep(DEFAULT_READ_SLEEP_S)
except ValueError as e:
logging.exception("{}".format(e))
start = group + self._scan_batching-1
self._process_writes()

def get_value(self, table, addr, type='uint16'):
if table not in self._values:
raise ValueError("Unsupported table type. Please only use: {}".format(self._values.keys()))
if addr not in self._values[table]:
def get_value(self, table, slave, addr, type='uint16'):
if table not in self._values[slave]:
raise ValueError("Unsupported table type. Please only use: {}".format(self._values[slave].keys()))
if addr not in self._values[slave][table]:
raise ValueError("Unpolled address. Use add_monitor_register(addr, table) to add a register to the polled list.")
# Read sequential addresses to get enough bytes to satisfy the type of this register.
# Note: Each address provides 2 bytes of data.
value = bytes(0)
type_len = type_length(type)
for i in range(type_len):
if self._word_order == WordOrder.HighLow:
data = self._values[table][addr + i]
data = self._values[slave][table][addr + i]
else:
data = self._values[table][addr + (type_len-i-1)]
data = self._values[slave][table][addr + (type_len-i-1)]
value += data.to_bytes(2,'big')
value = _convert_from_bytes_to_type(value, type)
return value

def set_value(self, table, addr, value, mask=0xFFFF, type='uint16'):
def set_value(self, table, slave, addr, value, mask=0xFFFF, type='uint16'):
if table != 'holding':
# I'm not sure if this is true for all devices. I might support writing to coils later,
# so leave this door open.
Expand All @@ -121,7 +129,7 @@ def set_value(self, table, addr, value, mask=0xFFFF, type='uint16'):
value = _convert_from_bytes_to_type(bytes_to_write[i*2:i*2+2], 'uint16')
else:
value = _convert_from_bytes_to_type(bytes_to_write[(type_len-i-1)*2:(type_len-i-1)*2+2], 'uint16')
self._planned_writes.put((addr+i, value, mask))
self._planned_writes.put((slave, addr+i, value, mask))

self._process_writes()

Expand All @@ -136,9 +144,9 @@ def _process_writes(self, max_block_s=DEFAULT_WRITE_BLOCK_INTERVAL_S):
try:
self._writing = True
while not self._planned_writes.empty() and (time() - write_start_time) < max_block_s:
addr, value, mask = self._planned_writes.get()
slave, addr, value, mask = self._planned_writes.get()
if mask == 0xFFFF:
self._mb.write_register(addr, value, unit=0x01)
self._mb.write_register(addr, value, unit=slave)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for fixing up the hard-coded values

else:
# https://pymodbus.readthedocs.io/en/latest/source/library/pymodbus.client.html?highlight=mask_write_register#pymodbus.client.common.ModbusClientMixin.mask_write_register
# https://www.mathworks.com/help/instrument/modify-the-contents-of-a-holding-register-using-a-mask-write.html
Expand All @@ -150,11 +158,11 @@ def _process_writes(self, max_block_s=DEFAULT_WRITE_BLOCK_INTERVAL_S):
# I suspect it's a different modbus opcode that tries to do clever things that my device doesn't support.
# result = self._mb.mask_write_register(address=addr, and_mask=(1<<16)-1-mask, or_mask=value, unit=0x01)
# print("Result: {}".format(result))
old_value = self._scan_value_range('holding', addr, 1)[0]
old_value = self._scan_value_range('holding', slave, addr, 1)[0]
and_mask = (1<<16)-1-mask
or_mask = value
new_value = (old_value & and_mask) | (or_mask & (mask))
self._mb.write_register(addr, new_value, unit=0x01)
self._mb.write_register(addr, new_value, unit=slave)
sleep(DEFAULT_WRITE_SLEEP_S)
except Exception as e:
# BUG catch only the specific exception that means pymodbus failed to write to a register
Expand All @@ -163,12 +171,12 @@ def _process_writes(self, max_block_s=DEFAULT_WRITE_BLOCK_INTERVAL_S):
finally:
self._writing = False

def _scan_value_range(self, table, start, count):
def _scan_value_range(self, table, slave, start, count):
result = None
if table == 'input':
result = self._mb.read_input_registers(start, count, unit=0x01)
result = self._mb.read_input_registers(start, count, unit=slave)
elif table == 'holding':
result = self._mb.read_holding_registers(start, count, unit=0x01)
result = self._mb.read_holding_registers(start, count, unit=slave)
try:
return result.registers
except:
Expand Down