站点图标 AI技术聚合

Python 实现上位机(一)

目的

记录并分享一个用 python 实现上位机功能的思路与模板。将通过三个方面分享:

  1. Python 自定义简易通信规约,并实现 Server 端
  2. Python 实现 Client 端
  3. PyQt5 绘制上位机界面,通过上位机作为 Client 访问 Server 端

本章将用 Python 实现一个简易的自定义规约。

自定义规约

首先假设使用场景为通过该协议向某 Client 的规定地址内读写数据。规定协议 Frame 的 Header 为 0x23,功能码 1字节,地址 2字节,数据 2字节,CRC校验两字节。Frame 总长度固定为 8字节。规约名字为 dummyCom。

'''
This script creates a dummy protocol:
+=======================================================================+
| Byte 0 | Byte 1 | Byte 2 | Byte 3 | Byte 4 | Byte 5 | Byte 6 | Byte 7 |
| ------ | ------ | ------ | ------ | ------ | ------ | ------ | ------ |
| Header |  FUNC  | ADDR_H | ADDR_L | DATA_H | DATA_L | CRC_H  | CRC_L  |
+=======================================================================+

The protocol frame always consists of 8 bytes, with:
Frame[0]:   Frame header: always be 0x23.
Frame[1]:   Function code:
                FC for client:
                    0x03: read from an address, data bytes should all be 0x00
                    0x06: write to an address with data specified in frame
                FC for server:
                    0x83: reply to client read request (0x03)
                    0x86: reply to client write request (0x06)
                    0xFF: reply to client, indicating request error
Frame[2]:   Address high
Frame[3]:   Address low
Frame[4]:   Data high
Frame[5]:   Data low
Frame[6]:   CRC high
Frame[7]:   CRC low
'''

对于 Client 端,功能码 0x03 为从给定地址读取数据。功能码0x06 为将 Frame 中 data 位的数据写入给定地址。我们之后要编写的上位机即是一个图形化界面的 Client 端。

对于 Server端,功能码 0x83 为回复 Client 0x03 的请求。功能码 0x86 为回复 Client 0x06 的请求。当 Server 端收到 Client 未知功能码时,回复功能码为 0xFF。

Python 实现简单协议栈

将 bytes 打印为 hex stream

为了方便调试,首先定义一个函数,将受到的 bytes 打印为 hex stream。

def bprint(msg: bytes, verbose=False):
    bStr = ''
    for byte in msg:
        bStr += f'{byte:>02X} '
    bStr = bStr.rstrip()
    if verbose:
        print(bStr)
    return bStr

CRC 校验

调用 crc 库实现简单的 CRC校验

#region CRC
from crc import Calculator, Crc16

crcCalc = Calculator(Crc16.MODBUS) # type:ignore

def calc_crc_dummy(msg: bytes):
    # calculate frame[0] to frame[5]
    return crcCalc.checksum(msg[:6])

def check_crc_dummy(msg: bytes):
    # check if the message CRC is correct
    return crcCalc.verify(msg[:6], msg[6]*0x100 + msg[7])

#endregion

编写协议栈

协议栈包含两个主要功能:一个是 parse_msg,即解析收到的报文;一个是 generate_msg,即生成需要的报文。实现代码如下:

#region dummyComStack
class dummyComStack(object):
    def __init__(self):
        # initialize something here
        pass
    
    def parse_msg(self, msg: bytes):
        if len(msg)!=8:
            return None, None, f'Not a dummyCom message. Expect message length: 8. Get message length: {len(msg)}.'
        if msg[0]!=0x23:
            return None, None, f'Not a dummyCom message. Expect message header: 0x23. Get message header: 0x{msg[0]:>02X}'
        if not check_crc_dummy(msg):
            return None, None, f'CRC error.'
        fc = msg[1]
        addr = msg[2]*0x100 + msg[3]
        data = msg[4]*0x100 + msg[5]
        return fc, addr, data

    def generate_msg(self, fc: int, addr: int, data: int):
        msg = [0x00]*8
        msg[0] = 0x23   # Header
        msg[1] = fc     # Function code
        msg[2], msg[3] = divmod(addr, 0x100)    # address
        msg[4], msg[5] = divmod(data, 0x100)    # data value
        msgCRC = calc_crc_dummy(bytes(msg)) 
        msg[6], msg[7] = divmod(msgCRC, 0x100) # CRC
        return bytes(msg)

#endregion

Python 实现 Server 端(简单版)

Python sqlite3 创建数据库

根据之前定义的使用场景,现在用一个简易的数据库来记录 <地址,数据>。创建一个 dummyDB.db,并在其中创建一个 Table dummyData,键为 address 和 value,类型都是 INT。创建之后,再插入 65536 对 <地址,数据>,其中地址从 0~65535,每个地址对应的数据与地址序号相同 (即 value = address)。

数据库生成脚本如下:

'''
initDB.py
'''

import sqlite3
import os

DB_NAME = "dummyDB.db"
TABLE_NAME = "dummyData"
KEYS = ("address", "value")

if __name__ == '__main__':
    # remove old DB
    if os.path.exists(DB_NAME):
        os.remove(DB_NAME)
    # create a new DB
    conn = sqlite3.connect(DB_NAME)
    cursor = conn.cursor()
    # create table
    sql = f"CREATE TABLE {TABLE_NAME} ({KEYS[0]} INT, {KEYS[1]} INT);"
    cursor.execute(sql)
    # create data, address from 0x0000 to 0xFFFF
    data = []
    for i in range(0,0x10000):
        # initial value equals to address index
        data.append((i,i))
    # insert data
    sql = f"INSERT INTO {TABLE_NAME}{KEYS} VALUES(?, ?);"
    cursor.executemany(sql, data)
    conn.commit()
    cursor.close()
    conn.close()

运行脚本之后,得到 dummyDB.db。

实现 TCP Server

为了本机调试方便,Server 假定为 TCP Server,并启动在 localhost,端口 2333。利用 socketserver 库实现一个简易的 TCP Server:

#region dummyComServer
import socketserver
import sqlite3


class dummyComHandler(socketserver.BaseRequestHandler):
    def handle(self):
        stack = dummyComStack()
        _dbconn = sqlite3.connect('dummyDB.db')
        database = _dbconn.cursor()
        while True:
            self.data = self.request.recv(1024)
            bprint(self.data, verbose=True)
            fc, addr, data = stack.parse_msg(self.data)
            if fc==0x03:
                # read
                try:
                    # get value from address in database
                    cmd = f'SELECT value FROM dummyData WHERE address={addr}'
                    database.execute(cmd)
                    value = database.fetchone()[0]
                    msg = stack.generate_msg(0x83, addr, value) # type:ignore
                except Exception as err:
                    # error happens
                    print(err)
                    msg = stack.generate_msg(0xFF, 0, 0)
            elif fc==0x06:
                # write
                try:
                    # set value to address in database
                    cmd = f'UPDATE dummyData SET value={data} WHERE address={addr}'
                    database.execute(cmd)
                    # save db
                    _dbconn.commit()
                    msg = stack.generate_msg(0x86, addr, data) # type:ignore
                except Exception as err:
                    # error happens
                    print(err)
                    msg = stack.generate_msg(0xFF, 0, 0)
            else:
                # unknown dummy request
                msg = stack.generate_msg(0xFF, 0, 0)
            self.request.sendall(msg)
            

class dummyComServer(object):
    def __init__(self, comParam=('localhost', 2333)):
        self.comParam = comParam

    def run(self):
        with socketserver.TCPServer(self.comParam, dummyComHandler) as server:
            server.serve_forever()

#endregion

Server 在启动时连接之前生成的 dummyDB.db 数据库。在收到功能码为 0x03 的正确 request 报文之后,从解析出来的地址中读取数据,并通过 stack.generate_msg 生成 reply 报文,发送给 Client 端。在收到功能码为 0x06 的正确 request 报文之后,将解析出来的数据写入解析出来的地址中,并通过 stack.generate_msg 生成 reply 报文,发送给 Client 端。当 Exception 发生时,Server 向 Client 发送功能码为 0xFF,地址为0,数据为0 的报文帧。

运行与调试

Server 运行主入口:

'''
dummyCom.py
'''
if __name__ == '__main__':
    dummyServer = dummyComServer()
    dummyServer.run()

在 Terminal 中运行 

python ./dummyCom.py

在另一个 Terminal 中进入 Python 解释器,利用 dummyComStack 以及 socket 测试 Server:

Socket 成功建立,并收到 Server 端的报文回复。通过 dummyComStack 可以正确解析。

下一篇将实现 Client 端。

文章出处登录后可见!

已经登录?立即刷新
退出移动版