Server: Modbus/TCP serial gateway

#!/usr/bin/env python3

"""
Modbus/TCP basic gateway (RTU slave(s) attached)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

[pyModbusTCP server] -> [ModbusSerialWorker] -> [serial RTU devices]

Run this as root to listen on TCP privileged ports (<= 1024).

Open /dev/ttyUSB0 at 115200 bauds and relay it RTU messages to slave(s).
$ sudo ./server_serial_gw.py --baudrate 115200 /dev/ttyUSB0
"""

import argparse
import logging
import queue
import struct
from threading import Event
from queue import Queue
from pyModbusTCP.server import ModbusServer
from pyModbusTCP.utils import crc16
from pyModbusTCP.constants import EXP_GATEWAY_PATH_UNAVAILABLE, EXP_GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND
# need sudo pip install pyserial==3.4
from serial import Serial, serialutil


# some class
class ModbusRTUFrame:
    """ Modbus RTU frame container class. """

    def __init__(self, raw=b''):
        # public
        self.raw = raw

    @property
    def pdu(self):
        """Return PDU part of frame."""
        return self.raw[1:-2]

    @property
    def slave_address(self):
        """Return slave address part of frame."""
        return self.raw[0]

    @property
    def function_code(self):
        """Return function code part of frame."""
        return self.raw[1]

    @property
    def is_valid(self):
        """Check if frame is valid.

        :return: True if frame is valid
        :rtype: bool
        """
        return len(self.raw) > 4 and crc16(self.raw) == 0

    def build(self, raw_pdu, slave_ad):
        """Build a full modbus RTU message from PDU and slave address.

        :param raw_pdu: modbus as raw value
        :type raw_pdu: bytes
        :param slave_ad: address of the slave
        :type slave_ad: int
        """
        # [address] + PDU
        tmp_raw = struct.pack('B', slave_ad) + raw_pdu
        # [address] + PDU + [CRC 16]
        tmp_raw += struct.pack('<H', crc16(tmp_raw))
        self.raw = tmp_raw


class RtuQuery:
    """ Request container to deal with modbus serial worker. """

    def __init__(self):
        self.completed = Event()
        self.request = ModbusRTUFrame()
        self.response = ModbusRTUFrame()


class ModbusSerialWorker:
    """ A serial worker to manage I/O with RTU devices. """

    def __init__(self, port, timeout=1.0, end_of_frame=0.05):
        # public
        self.serial_port = port
        self.timeout = timeout
        self.end_of_frame = end_of_frame
        # internal request queue
        # accept 5 simultaneous requests before overloaded exception is return
        self.rtu_queries_q = Queue(maxsize=5)

    def loop(self):
        """Serial worker main loop."""
        while True:
            # get next exchange from queue
            rtu_query = self.rtu_queries_q.get()
            # send to serial
            self.serial_port.reset_input_buffer()
            self.serial_port.write(rtu_query.request.raw)
            # receive from serial
            # wait for first byte of data until timeout delay
            self.serial_port.timeout = self.timeout
            rx_raw = self.serial_port.read(1)
            # if ok, wait for the remaining
            if rx_raw:
                self.serial_port.timeout = self.end_of_frame
                # wait for next bytes of data until end of frame delay
                while True:
                    rx_chunk = self.serial_port.read(256)
                    if not rx_chunk:
                        break
                    else:
                        rx_raw += rx_chunk
            rtu_query.response.raw = rx_raw
            # mark all as done
            rtu_query.completed.set()
            self.rtu_queries_q.task_done()

    def srv_engine_entry(self, session_data):
        """Server engine entry point (pass request to serial worker queries queue).

        :param session_data: server session data
        :type session_data: ModbusServer.SessionData
        """
        # init a serial exchange from session data
        rtu_query = RtuQuery()
        rtu_query.request.build(raw_pdu=session_data.request.pdu.raw,
                                slave_ad=session_data.request.mbap.unit_id)
        try:
            # add a request in the serial worker queue, can raise queue.Full
            self.rtu_queries_q.put(rtu_query, block=False)
            # wait result
            rtu_query.completed.wait()
            # check receive frame status
            if rtu_query.response.is_valid:
                session_data.response.pdu.raw = rtu_query.response.pdu
                return
            # except status for slave failed to respond
            exp_status = EXP_GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND
        except queue.Full:
            # except status for overloaded gateway
            exp_status = EXP_GATEWAY_PATH_UNAVAILABLE
        # return modbus exception
        func_code = rtu_query.request.function_code
        session_data.response.pdu.build_except(func_code=func_code, exp_status=exp_status)


if __name__ == '__main__':
    # parse args
    parser = argparse.ArgumentParser()
    parser.add_argument('device', type=str, help='serial device (like /dev/ttyUSB0)')
    parser.add_argument('-H', '--host', type=str, default='localhost', help='host (default: localhost)')
    parser.add_argument('-p', '--port', type=int, default=502, help='TCP port (default: 502)')
    parser.add_argument('-b', '--baudrate', type=int, default=9600, help='serial rate (default is 9600)')
    parser.add_argument('-t', '--timeout', type=float, default=1.0, help='timeout delay (default is 1.0 s)')
    parser.add_argument('-e', '--eof', type=float, default=0.05, help='end of frame delay (default is 0.05 s)')
    parser.add_argument('-d', '--debug', action='store_true', help='set debug mode')
    args = parser.parse_args()
    # init logging
    logging.basicConfig(level=logging.DEBUG if args.debug else None)
    logger = logging.getLogger(__name__)
    try:
        # init serial port
        logger.debug('Open serial port %s at %d bauds', args.device, args.baudrate)
        serial_port = Serial(port=args.device, baudrate=args.baudrate)
        # init serial worker
        serial_worker = ModbusSerialWorker(serial_port, args.timeout, args.eof)
        # start modbus server with custom engine
        logger.debug('Start modbus server (%s, %d)', args.host, args.port)
        srv = ModbusServer(host=args.host, port=args.port,
                           no_block=True, ext_engine=serial_worker.srv_engine_entry)
        srv.start()
        # start serial worker loop
        logger.debug('Start serial worker')
        serial_worker.loop()
    except serialutil.SerialException as e:
        logger.critical('Serial device error: %r', e)
        exit(1)
    except ModbusServer.Error as e:
        logger.critical('Modbus server error: %r', e)
        exit(2)