8.26. modbus_rt 模块 API 文档

8.26.1. API

8.26.1.1. class data_trans(_modbus_rt._data_trans):

def reg2reg(self,val:int):...
def regs2regs(self,val:list):...
def regs2bytes(self,val:list,mode=LITTLE_ENDIAL_SWAP):...
def regs2str(self,val:list,mode=LITTLE_ENDIAL_SWAP):...
def regs2signed(self,val:list,mode=LITTLE_ENDIAL_SWAP):...
def regs2int(self,val:list,mode=LITTLE_ENDIAL_SWAP):...
def regs2uint(self,val:list,mode=LITTLE_ENDIAL_SWAP):...
def regs2long(self,val:list,mode=LITTLE_ENDIAL_SWAP):...
def regs2float(self,val:list,mode=LITTLE_ENDIAL_SWAP):...
def regs2double(self,val:list,mode=LITTLE_ENDIAL_SWAP):...
def bytes2regs(self,val:any,mode=LITTLE_ENDIAL_SWAP):...
def str2regs(self,val:str,mode=LITTLE_ENDIAL_SWAP):...
def signed2regs(self,val:list,mode=LITTLE_ENDIAL_SWAP):...
def int2regs(self,val:list,mode=LITTLE_ENDIAL_SWAP):...
def uint2regs(self,val:list,mode=LITTLE_ENDIAL_SWAP):...
def long2regs(self,val:list,mode=LITTLE_ENDIAL_SWAP):...
def float2regs(self,val:list,mode=LITTLE_ENDIAL_SWAP):...
def double2regs(self,val:list,mode=LITTLE_ENDIAL_SWAP):...

8.26.1.2. class rtu(_modbus_rt._rtu):

def __init__(self,mode=SLAVE):...
def set_serial(self,devname:str,baudrate=115200,bytesize=8,parity='N',stopbits=1,xonxoff=0):...
def set_over_type(self,over_type:int):...
def set_net(self,ip='',port=502,type=SOCK_STREAM):...
def set_ip(self,ip:str):...
def set_port(self,port:int):...
def set_type(self,type:int):...
def set_p2p(self,p2p_flag:int):...
def open(self):...
def isopen(self):...
def close(self):...
def set_addr(self,addr:int):...
def set_strict(self,strict:int):...
def add_block(self,name:str,type:int,addr:int,nums:int):...
def regs_binding(self,regs:bytes,type:int,addr:int,nums:int):...
def set_pre_ans_callback(self,cb):...
def set_done_callback(self,cb):...
def set_dev_binding(self,flag:int):...
def set_server(self,saddr:str,sport:int):...
def get_saddr(self):...
def excuse(self,dir_slave:int,type_function:int,addr:int,*val):...
def download(self,slave:int,file_dev:str,file_master:str):...
def upload(self,slave:int,file_dev:str,file_master:str):...

8.26.1.3. class ascii(rtu):

8.26.1.4. class tcp(_modbus_rt._tcp):

def __init__(self,mode=SLAVE):...
def set_net(self,ip='',port=502,type=SOCK_STREAM):...
def set_ip(self,ip:str):...
def set_port(self,port:int):...
def set_type(self,type:int):...
def set_p2p(self,p2p_flag:int):...
def open(self):...
def isopen(self):...
def close(self):...
def set_addr(self,addr:int):...
def set_strict(self,strict:int):...
def add_block(self,name:str,type:int,addr:int,nums:int):...
def regs_binding(self,regs:bytes,type:int,addr:int,nums:int):...
def set_pre_ans_callback(self,cb):...
def set_done_callback(self,cb):...
def set_dev_binding(self,flag:int):...
def set_server(self,saddr:str,sport:int):...
def get_saddr(self):...
def excuse(self,dir_slave:int,type_function:int,addr:int,*val):...
def download(self,slave:int,file_dev:str,file_master:str):...
def upload(self,slave:int,file_dev:str,file_master:str):...

8.26.2. Examples

8.26.2.1. tcp2rtu_dtu.py

import modbus_rt
import modbus_rt_defines as cst

serial_name = "/dev/ttyUSB0"
ip_addr = ""

rm = modbus_rt.rtu(cst.MASTER)
rm.set_serial(serial_name)
rm.open()
ts = modbus_rt.tcp()
ts.set_net(ip_addr, 502, cst.SOCK_STREAM)
def pre_call(evt) :
    slave = evt.slave
    function = evt.function
    addr = evt.addr
    quantity = evt.quantity
    if cst.READ_HOLDING_REGISTERS == function: 
        data = rm.excuse(slave, function, addr, quantity)
        ts.excuse(cst.WRITE, cst.REGISTERS, addr, quantity, data)
    elif cst.READ_DISCRETE_INPUTS == function: 
        data = rm.excuse(slave, function, addr, quantity)
        ts.excuse(cst.WRITE, cst.INPUTS, addr, quantity, data)
def done_call(evt) :
    slave = evt.slave
    function = evt.function
    addr = evt.addr
    quantity = evt.quantity
    if cst.WRITE_SINGLE_COIL == function: 
        data = ts.excuse(cst.READ, cst.CIOLS, addr, 1)  
        rm.excuse(slave, function, addr, data[0])  
    elif cst.WRITE_SINGLE_REGISTER == function: 
        data = ts.excuse(cst.READ, cst.REGISTERS, addr, 1) 
        rm.excuse(slave, function, addr, data[0])  
    elif cst.WRITE_MULTIPLE_COILS == function: 
        data = ts.excuse(cst.READ, cst.CIOLS, addr, quantity)  
        rm.excuse(slave, function, addr, quantity, data)  
    elif cst.WRITE_MULTIPLE_REGISTERS == function: 
        data = ts.excuse(0, cst.REGISTERS, addr, quantity)  
        rm.excuse(slave, function, addr, quantity, data)    
ts.add_block("A", 0, 20000, 10)
ts.add_block("B", 1, 10000, 16)
ts.add_block("C", 4, 0, 10)
ts.set_strict(0)
ts.set_pre_ans_callback(pre_call)
ts.set_done_callback(done_call)
ts.open()

8.26.2.2. rtu_slave_for_udp.py

import modbus_rt
import modbus_rt_defines as cst

ip_addr = "192.168.28.150"

rsu =  modbus_rt.rtu()
rsu.set_over_type(cst.OVER_NET)
rsu.set_net(ip_addr, 502, cst.SOCK_DGRAM)
rsu.add_block("A",cst.REGISTERS, 0, 10)
rsu.open()
# rsu.excuse(0,4,0,5)
# rsu.excuse(1,4,0,5,[1,2,34,5,6])
# rsu.excuse(0,4,0,5)

8.26.2.3. udp_slave.py

import modbus_rt
import modbus_rt_defines as cst

ip_addr = "192.168.28.150"

tsu = modbus_rt.tcp()
tsu.set_net(ip_addr, 502, cst.SOCK_DGRAM)
tsu.add_block("A",cst.REGISTERS, 0, 10)
tsu.open()
# tsu.excuse(0,4,0,5)
# tsu.excuse(1,4,0,5,[1,2,34,5,6])
# tsu.excuse(0,4,0,5)

8.26.2.4. rtu_slave.py

import modbus_rt
import modbus_rt_defines as cst

serial_name = "uart1"

rs =  modbus_rt.rtu()
rs.set_serial(serial_name)
rs.add_block("A",cst.REGISTERS, 0, 10)
rs.open()
# rs.excuse(0,4,0,5)
# rs.excuse(1,4,0,5,[1,2,34,5,6])
# rs.excuse(0,4,0,5)

8.26.2.5. dtu_stm32.py

import modbus_rt
import modbus_rt_defines as cst

serial_name = "uart4"
ip_addr = ""

rm = modbus_rt.rtu(cst.MASTER)
rm.set_serial(serial_name)
rm.open()
ts = modbus_rt.tcp()
ts.set_net(ip_addr, 502, cst.SOCK_STREAM)
def pre_call(evt) :
    slave = evt.slave
    function = evt.function
    addr = evt.addr
    quantity = evt.quantity
    if cst.READ_DISCRETE_INPUTS == function: 
        if addr >= 0 and addr <= 16 :
            data = rm.excuse(slave, function, addr + 10000, quantity)
            ts.excuse(cst.WRITE, cst.INPUTS, addr, quantity, data)
    elif cst.READ_COILS == function: 
        if addr >= 0 and addr <= 16 :
            data = rm.excuse(slave, function, addr + 20000, quantity)
            ts.excuse(cst.WRITE, cst.CIOLS, addr, quantity, data)
def done_call(evt) :
    slave = evt.slave
    function = evt.function
    addr = evt.addr
    quantity = evt.quantity
    if cst.WRITE_SINGLE_COIL == function: 
        if addr >= 0 and addr <= 16 :
            data = ts.excuse(cst.READ, cst.CIOLS, addr, 1)  
            rm.excuse(slave, function, addr + 20000, data[0])  
    elif cst.WRITE_MULTIPLE_COILS == function: 
        if addr >= 0 and addr <= 16 :
            data = ts.excuse(cst.READ, cst.CIOLS, addr, quantity)  
            rm.excuse(slave, function, addr + 20000, quantity, data)  
ts.set_strict(0)
ts.set_pre_ans_callback(pre_call)
ts.set_done_callback(done_call)
ts.open()

8.26.2.6. ascii_slave.py

import modbus_rt
import modbus_rt_defines as cst

serial_name = "COM11"

ascii_s =  modbus_rt.ascii()
ascii_s.set_serial(serial_name)
ascii_s.add_block("A",cst.REGISTERS, 0, 10)
ascii_s.open()
# ascii_s.excuse(0,4,0,5)
# ascii_s.excuse(1,4,0,5,[1,2,34,5,6])
# ascii_s.excuse(0,4,0,5)

8.26.2.7. tcp_master.py

import modbus_rt
import modbus_rt_defines as cst

ip_addr = "192.168.28.150"

tm = modbus_rt.tcp(cst.MASTER)
tm.set_net(ip_addr, 0, cst.SOCK_STREAM)
tm.set_server(ip_addr, 502)
tm.open()
# tm.excuse(1,3,0,5)
# tm.excuse(1,16,0,5,[21,23,24,25,65])
# tm.excuse(1,3,0,5)

8.26.2.8. rtu_slave_for_tcp.py

import modbus_rt
import modbus_rt_defines as cst

ip_addr = "192.168.28.150"

rst =  modbus_rt.rtu()
rst.set_over_type(cst.OVER_NET)
rst.set_net(ip_addr, 502, cst.SOCK_STREAM)
rst.add_block("A",cst.REGISTERS, 0, 10)
rst.open()
# rst.excuse(0,4,0,5)
# rst.excuse(1,4,0,5,[1,2,34,5,6])
# rst.excuse(0,4,0,5)

8.26.2.9. rtu_master.py

import modbus_rt
import modbus_rt_defines as cst

serial_name = "COM11"

rm =  modbus_rt.rtu(cst.MASTER)
rm.set_serial(serial_name)
rm.open()
# rm.excuse(1,3,0,5)
# rm.excuse(1,16,0,5,[21,23,24,25,65])
# rm.excuse(1,3,0,5)

8.26.2.10. tcp_slave.py

import modbus_rt
import modbus_rt_defines as cst

ip_addr = "192.168.28.150"

ts = modbus_rt.tcp()
ts.set_net(ip_addr, 502, cst.SOCK_STREAM)
ts.add_block("A",cst.REGISTERS, 0, 10)
ts.open()
# ts.excuse(0,4,0,5)
# ts.excuse(1,4,0,5,[1,2,34,5,6])
# ts.excuse(0,4,0,5)

8.26.2.11. ascii_master.py

import modbus_rt
import modbus_rt_defines as cst

serial_name = "COM11"

ascii_m =  modbus_rt.rtu(cst.MASTER)
ascii_m.set_serial(serial_name)
ascii_m.open()
# ascii_m.excuse(1,3,0,5)
# ascii_m.excuse(1,16,0,5,[21,23,24,25,65])
# ascii_m.excuse(1,3,0,5)

8.26.2.12. udp_master.py

import modbus_rt
import modbus_rt_defines as cst

ip_addr = "192.168.28.150"

tmu = modbus_rt.tcp(cst.MASTER)
tmu.set_net(ip_addr, 0, cst.SOCK_DGRAM)
tmu.open()

# tmu.set_server("255.255.255.255", 502)
# tmu.excuse(1,3,0,5)
# tmu.get_saddr()
# tmu.excuse(1,16,0,5,[21,23,24,25,65])
# tmu.excuse(1,3,0,5)