10. Modbus协议

在前面,我们介绍了鲁班猫板卡上的各种对外连接的接口, 大家可以发现,其中的一些接口在工业领域应用的是十分广泛的。 那么我们鲁班猫板卡,能否在这些接口的基础上,再拓展出一些强大的功能呢?

本节实验中,我们就来给大家介绍一下在工业领域中,十分常见工业总线协议:Modbus通讯协议。

10.1. Modbus协议实验

我们的鲁班猫板卡系统有丰富的开源软件支持,像我们本节要介绍的Modbus通讯协议, 也有对应的软件可以供我们安装使用。

本节实验,我们的目标是使用Python语言及Python库-python3-pymodbus, 来完成Modbus协议中约定的RTU主站及从站功能代码的编写。

那么我们来简单了解一下python3-pymodbus库。

10.2. python3-pymodbus库

pymodbus是基于BSD开源协议的一个的Modbus协议Python库。 它的功能十分强大,实现了Modbus协议中约定的所有功能,并且对通讯主机以同步及异步(asyncio、tornado、twisted)的方式进行了实现, 在拥有不错的性能的同时,也为Python开发者在构建Modbus协议应用时,对应用功能进行额外拓展提供了更多可能。

pymodbus支持以太网、串行接口来承载Modbus协议的物理层传输, 如果你不使用串行接口(pymodbus串行接口的实现依赖于pyserial库包)。 那么python3-pymodbus库完全不依赖与第三方库包,所以该库是十分轻量的。

我们可以在鲁班猫板卡上安装python3-pymodbus库, 并通过一些官方提供的示例代码来使用该库,完成本节的实验。

10.3. 实验准备

10.3.1. 添加uart资源

重要

本节实验使用 两个uart串口(一主一从) 来适配Modbus底层硬件的连接, 故此小节为添加uart资源演示,pymodbus库提供Modbus协议的全栈实现, 所以底层使用网口通讯也是得到支持的,大家可以根据实际情况参考本小节内容与否。

在板卡上的部分资源可能默认未被开启,在使用前请根据需要修改 /boot/uEnv.txt 文件, 可添加对应设备树插件的加载,重启系统,以在系统中添加对应资源。

如本节实验中,通常情况下在鲁班猫系统中默认使能 uart 的功能, 如大家发现uart的设备树插件未加载,请做相应修改。

同时如果大家在使用中发现uart功能引脚被复用或有冲突,请要将对应占用uart引脚资源的设备树插件取消加载, 否则uart资源可能无法使用。若已开启对应资源,忽略以下步骤。

本节实验中,笔者使用鲁班猫i.MX6ULL Pro板卡为例演示, 板卡上uart的设备树插件和rs485、can设备树插件存在引脚冲突, 因此在启用uart设备树插件时,需要关闭rs485、can设备树插件。 找到uart、rs485、can相关设备树插件修改,修改后并重启开发板。

方法参考如下:

broken

添加uart2、uart3设备树插件,以使系统支持uart2、uart3功能,如下:

broken
# 以鲁班猫i.MX6ULL Pro板卡uart2、uart3设备树插件内容为例:
dtoverlay=/usr/lib/linux-image-4.19.35-imx6/overlays/imx-fire-uart2.dtbo
dtoverlay=/usr/lib/linux-image-4.19.35-imx6/overlays/imx-fire-uart3.dtbo

如若运行代码时出现“Device or resource busy”或者运行代码卡死等等现象, 请按上述情况检查并按上述步骤操作。

如出现 Permission denied 或类似字样,请注意用户权限,大部分操作硬件外设的功能,几乎都需要root用户权限,简单的解决方案是在执行语句前加入sudo或以root用户运行程序。

在启动文件中添加好设备资源并重启板卡之后,我们可以查看设备是否已经正常添加。

# 在终端中输入如下命令,可以查看到uart2、uart3设备资源:
ls /dev/ttymxc*

检查板卡上的uart2、uart3设备资源示例,仅供参考:

broken

上图中显示了新增的两个串口设备,会在后文使用到。

10.3.2. 硬件连接

硬件连接小节,需要大家根据自身开发环境等情况对本章内容进行参考调整。

本章中笔者会使用两个串口:uart2、uart3,一主一从为大家进行Modbus协议 数据传输的演示。

笔者连接如下:

broken

重要

pymodbus库对串行设备的支持,是基于pyserial库实现的, 该库默认的串行设备支持,是基于uart,故本节也以uart进行演示实验。 如想pymodbus使用rs485底层电气连接的方式,需要自行修改pymodbus库底层源码。

10.3.3. pymodbus库安装

# 在终端中输入如下命令:
sudo apt -y install python3-pymodbus

10.3.4. pymodbus库使用

安装好对应的库之后,我们就可以利用安装好的python3-pymodbus库编写一下测试代码。

10.4. 测试代码

10.4.1. RTU主机-master_client.py

代码如下:

配套代码 io/modbus/master_client.py文件内容
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#!/usr/bin/env python
"""
Pymodbus Asynchronous Client Examples
--------------------------------------------------------------------------

The following is an example of how to use the asynchronous serial modbus
client implementation from pymodbus with ayncio.

The example is only valid on Python3.4 and above
"""
import os
from pymodbus.compat import IS_PYTHON3, PYTHON_VERSION

if IS_PYTHON3 and PYTHON_VERSION >= (3, 4):
    import logging
    import asyncio
    from pymodbus.client.asynchronous.serial import (
        AsyncModbusSerialClient as ModbusClient,
    )
    from pymodbus.client.asynchronous import schedulers
else:
    import sys

    sys.stderr("This example needs to be run only on python 3.4 and above")
    sys.exit(1)

# --------------------------------------------------------------------------- #
# 配置主机客户端log
# --------------------------------------------------------------------------- #
fmt = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
file_handler = logging.FileHandler(os.getcwd() + "/master_client.log")
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(fmt)

logging.basicConfig()
log = logging.getLogger()
log.setLevel(logging.DEBUG)
log.addHandler(file_handler)

UNIT = 0x01  # 目标从机的地址


async def start_async_test(client):
    """
    测试代码
    """
    try:
        # --------------------------------------------------------------------------- #
        # --------------------------------------------------------------------------- #
        log.debug("\nINFO : 写入单个线圈测试----写入的寄存器地址为0,写入值为True,写入的从机地址为0x01")
        # write_coil 写线圈功能,参数:线圈的地址、写入的数值、从机地址
        rq = await client.write_coil(0, True, unit=UNIT)

        log.debug("\nINFO : 读取单个线圈测试----读取的寄存器地址为0,读取数量为1,读取的从机地址为0x01")
        # read_coils 读线圈功能,参数:线圈的地址、读取线圈的数量、从机地址
        rr = await client.read_coils(0, 1, unit=UNIT)

        assert rq.function_code < 0x80  # 检测写线圈测试响应是否正常
        assert rr.bits[0] == True  # 检测读线圈测试读取的值是否为True
        # --------------------------------------------------------------------------- #
        # --------------------------------------------------------------------------- #
        log.debug("\nINFO : 写入多个线圈测试----写入的寄存器起始地址为1,写入8个值均为True,写入的从机地址为0x01")
        # write_coil 参数:线圈的起始地址、写入的数值(参数为列表则写入对应数量的线圈)、从机地址
        rq = await client.write_coils(1, [True] * 8, unit=UNIT)
        assert rq.function_code < 0x80

        log.debug("\nINFO : 读取多个线圈测试----读取的寄存器起始地址为1,读取数量为8,读取的从机地址为0x01")
        # read_coils 参数:线圈的起始地址、读取线圈的数量、从机地址
        rr = await client.read_coils(1, 8, unit=UNIT)
        assert rr.function_code < 0x80

        resp = [True] * 8  # 构造期望的返回值,8个True
        assert rr.bits == resp  # 将期望值与线圈读回值对比
        # --------------------------------------------------------------------------- #
        # --------------------------------------------------------------------------- #
        log.debug("\nINFO : 写入多个线圈测试----写入的寄存器起始地址为1,写入9个值均为False,写入的从机地址为0x01")
        rq = await client.write_coils(1, [True] * 9, unit=UNIT)
        log.debug("\nINFO : 读取多个线圈测试----读取的寄存器起始地址为1,读取9个值,读取的从机地址为0x01")
        rr = await client.read_coils(1, 9, unit=UNIT)
        assert rq.function_code < 0x80

        # 每次读取线圈,返回值位数会对8向上圆整
        # 且返回值数量不为8的倍数,则剩余位会被置为False
        # 即读取9位数据:1111 1111 1
        # 返回值则为   :1111 1111 1000 0000
        resp = [True] * 9  # 构造期望的返回值,9个True
        resp.extend([False] * 7)  # 构造期望的返回值,圆整到8的倍数,将无效数据置为False
        assert rr.bits == resp  # 将期望值与线圈读回值对比
        # --------------------------------------------------------------------------- #
        # --------------------------------------------------------------------------- #
        log.debug("\nINFO : 读取离散输入寄存器测试----读取的寄存器起始地址为0,读取8个值,读取的从机地址为0x01")
        # read_discrete_inputs 读离散输入功能,参数:离散输入寄存器的地址、读取离散输入寄存器的数量、从机地址
        rr = await client.read_discrete_inputs(0, 8, unit=UNIT)
        assert rq.function_code < 0x80
        # --------------------------------------------------------------------------- #
        # --------------------------------------------------------------------------- #
        log.debug("\nINFO : 写入单个保持寄存器测试----写入的寄存器起始地址为1,写入1个值为88,写入的从机地址为0x01")
        # write_register 读保持寄存器功能,参数:保持寄存器的地址、写入保持寄存器的值、从机地址
        rq = await client.write_register(1, 88, unit=UNIT)
        log.debug("\nINFO : 读取单个保持寄存器测试----读取的寄存器起始地址为1,读取1个值,读取的从机地址为0x01")
        # read_holding_registers 写保持寄存器功能,参数:保持寄存器的地址、读取保持寄存器的数量、从机地址
        rr = await client.read_holding_registers(1, 1, unit=UNIT)

        assert rq.function_code < 0x80
        assert rr.registers[0] == 88  # 将期望值与保持寄存器读回值对比
        # --------------------------------------------------------------------------- #
        # --------------------------------------------------------------------------- #
        log.debug("\nINFO : 写入多个保持寄存器测试----写入的寄存器起始地址为1,写入8个值均为66,写入的从机地址为0x01")
        rq = await client.write_registers(1, [66] * 8, unit=UNIT)
        log.debug("\nINFO : 读取多个保持寄存器测试----读取的寄存器起始地址为1,读取8个值,读取的从机地址为0x01")
        rr = await client.read_holding_registers(1, 8, unit=UNIT)
        assert rq.function_code < 0x80
        assert rr.registers == [66] * 8  # 将期望值与保持寄存器读回值对比
        # --------------------------------------------------------------------------- #
        # --------------------------------------------------------------------------- #
        arguments = {
            "read_address": 1,
            "read_count": 8,
            "write_address": 1,
            "write_registers": [77] * 8,
        }
        log.debug("\nINFO : 同时读写保持寄存器测试----读写输入寄存器起始地址均为1,读写8个值,写入8个值均为20,读写的从机地址为0x01")
        rq = await client.readwrite_registers(unit=UNIT, **arguments)
        rr = await client.read_holding_registers(1, 8, unit=UNIT)
        assert rq.function_code < 0x80
        assert rq.registers == [77] * 8
        assert rr.registers == [77] * 8  # 将期望值与保持寄存器读回值对比
        # --------------------------------------------------------------------------- #
        # --------------------------------------------------------------------------- #
        log.debug("\nINFO : 读取输入寄存器测试----读取输入寄存器起始地址为1,读取8个值,读取的从机地址为0x01")
        # read_input_registers 读取输入寄存器功能,参数:输入寄存器的起始地址、读取输入寄存器的数量、从机地址
        rr = await client.read_input_registers(1, 8, unit=UNIT)
        assert rq.function_code < 0x80
    except Exception as e:
        log.exception(e)
        client.transport.close()
    await asyncio.sleep(0.5)
    import sys

    sys.exit()


if __name__ == "__main__":
    # 启动主机客户端,监听串行端口'/dev/ttymxc2',串口波特率115200,modbus传输方式rtu
    loop, serial_client = ModbusClient(
        schedulers.ASYNC_IO, port="/dev/ttymxc2", baudrate=115200, method="rtu"
    )
    loop.run_until_complete(start_async_test(serial_client.protocol))
    loop.close()

示例代码修改自官方提供Demo, 此Demo使用了pymodbus基于asyncio异步库实现的Modbus协议RTU通讯、串口异步主机, 代码里我们设置对外的通讯端口,是在硬件配置小节中开启的串行端口’/dev/ttymxc2’。

当代码启动时,会以异步方式去启动 start_async_test 函数功能, 函数中对Modbus协议的各项功能进行了测试,主要有线圈读写、离散输入寄存器读取、输入寄存器读取、保持寄存器读写。

10.4.2. RTU从机-slave_server.py

代码如下:

配套代码 io/modbus/slave_server.py文件内容
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#!/usr/bin/env python
"""
a simple device simulator.
"""
import os
import logging

from pymodbus.framer.rtu_framer import ModbusRtuFramer
from pymodbus.server.sync import StartTcpServer, StartSerialServer
from pymodbus.datastore import (
    ModbusSequentialDataBlock,
    ModbusSlaveContext,
    ModbusServerContext,
)

# 创建log对象,log对象为pymodbus
pymodbus_logger = logging.getLogger("pymodbus")
# 设置log记录等级,log等级可以参考
# logging.CRITICAL
# logging.ERROR
# logging.WARNING
# logging.INFO
# logging.DEBUG
pymodbus_logger.setLevel(logging.DEBUG)

if __name__ == "__main__":
    # 设置log记录格式:log时间-log发生对象-log等级-log打印信息
    fmt = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
    # 创建log文件对象,文件创建在当前目录,文件名为slave.log
    file_handler = logging.FileHandler(os.getcwd() + "/slave_server.log")
    # 设置文件对象记录log的等级
    file_handler.setLevel(logging.DEBUG)
    # 设置文件对象log的格式
    file_handler.setFormatter(fmt)
    # 添加log文件对象到当前log系统中,这样产生log后,log会被同步写入到文件中
    pymodbus_logger.addHandler(file_handler)

    # 定义co线圈寄存器,存储起始地址为0,长度为20,内容为15个True及5个False
    co_block = ModbusSequentialDataBlock(0, [True] * 15 + [False] * 5)
    # 定义di离散输入寄存器,存储起始地址为0,长度为20,内容为15个True及5个False
    di_block = ModbusSequentialDataBlock(0, [True] * 15 + [False] * 5)
    # 定义ir输入寄存器,存储起始地址为0,长度为10,内容为0~10递增数值列表
    ir_block = ModbusSequentialDataBlock(0, [i for i in range(10)])
    # 定义hr保持寄存器,存储起始地址为0,长度为10,内容为0~10递增数值列表
    hr_block = ModbusSequentialDataBlock(0, [i for i in range(10)])

    # 创建从机,从机的di离散量、co线圈、hr保持寄存器、ir输入寄存器等由上面定义并传入
    slaves = ModbusSlaveContext(di=di_block, co=co_block, hr=hr_block, ir=ir_block)
    # 创建单从机上下文,交由服务器调度
    context = ModbusServerContext(slaves=slaves, single=True)

    # 如果需要创建多个从机,参考如下:从机地址 + 从机配置
    # slaves = {
    # 1:ModbusSlaveContext(di=di_block, co=co_block, hr=hr_block, ir=ir_block),
    # 2:ModbusSlaveContext(di=di_block, co=co_block, hr=hr_block, ir=ir_block),
    # 3:ModbusSlaveContext(di=di_block, co=co_block, hr=hr_block, ir=ir_block)
    # }
    # context = ModbusServerContext(slaves=slaves, single=False)

    # 开启tcp服务器方法
    # StartTcpServer(context, address=('127.0.0.1', 5020))
    # 开启串行设备服务器方法
    # 参数:从机上下文、从机通行帧格式、监听设备'/dev/ttymxc1'、串口波特率、从机监听超时时间0为不阻塞立刻响应
    StartSerialServer(
        context, framer=ModbusRtuFramer, port="/dev/ttymxc1", baudrate=115200, timeout=0
    )

示例代码修改自官方提供Demo,此Demo使用了pymodbus实现的Modbus协议RTU通讯、串口同步从机, 代码里我们监听了在硬件配置小节中开启的串行端口’/dev/ttymxc1’的数据。

使用pymodbus模拟Modbus从设备的功能,需要构造从机上下文环境, 该环境由Modbus协议规定,需要实现Modbus从机的各项功能,比如各种寄存器。 最终该上下文由Server负责调度运作,实现主机对从机数据的存入读出维护, pymodbus还在此基础上实现了数据库的接口,感兴趣的同学可以查看pymodbus源码及示例。

10.4.3. 实验步骤

将配套代码io\modbus目录中的测试代码上传至开发板任一目录下:

打开两个终端,依次启动从机、主机:

终端1,启动从机:

# 在终端中输入如下命令:
python3 slave_server.py

从机启动不会有响应,启动记录会以log形式输入到启动从机的目录下, 文件slave_server.log中。

可以在其他终端中输入如下命令,查看log文件内容。

# 在终端中输入如下命令:
cat slave_server.log
broken

终端2,启动主机:

# 在终端中输入如下命令:
python3 master_client.py

主机启动后,执行的每一步操作会有对应log信息输出,输出内容也会以log形式输入到主机启动目录下, 文件master_client.log中。

命令执行后,终端2打印出的信息如下:

broken

在测试代码中,可以看到,主机对从机发送的命令、接收到从机的响应信息及各项测试结果等等。

如主机没有正常结束退出,请检查硬件连接。

10.4.4. 现象简析

我们可以通过一张图来简单看看,Modbus-RTU主从机在通讯过程中发生的一些请求与响应。

写入多个保持寄存器测试 为例(单击图片可放大):

broken

图中,在主机部分,我们编写的主机测试代码,发送了Modbus功能码-16(0x10)写多个寄存器的命令及对应参数, 此命令会操作从机的保持寄存器中的内容,这部分内容是我们在从机代码中定义的上下文部分。 当从机收到命令后,会做出对应的响应,并将响应的log信息,记录在slave_server.log文件中。

自此,一次Modbus-RTU主从机通讯完成。

在前面,我们提到了,pymodbus库是使用Python语言编写的Modbus协议的全栈实现, 在我们给大家提供的示例代码中,只展示了该库的一部分功能。

在pymodbus库中还支持用户自定义的请求及响应,即Modbus协议中约定的用户定义功能码相关内容。 此外pymodbus库还是实现了一些服务器功能,比如数据持久化,可以让用户将从机寄存器内容通过 一些常见的数据库进行存储。

感兴趣的同学可以关注,pymodbus的Github仓库 pymodbus , 学习更多相关的内容。

也可以参考官方给的示例代码, pymodbus-examples