源码将于最后一遍文章给出下载
串口(COM)通讯开发
本章节测试使用了 Configure Virtual Serial Port Driver虚拟串口工具和本人自写的串口调试工具,请自行baidu下载对应工具

在com.zxy.common.Com_Para.py中添加如下内容
- #RS232串口通讯列表 串口号,波特率,数据位,索引(A,B,C,D区分),多串口分割符;
- ComPortList = "" #linux参考:/dev/ttyS0,9600,8,0,A;/dev/ttyS1.9600,8,0,B windwows参考:COM1,9600,8,0;COM2,9600,8,2
- #串口通讯全局变量hashtable
串口索引---串口对象 - htComPort = {}
在com.zxy.main.Init_Page.py中添加如下内容
- @staticmethod
- def Start_ComPort():
- iIndex = 0
- for temComPort in Com_Para.ComPortList.split(";"):
- iIndex = iIndex + 1
- temComPortInfo = temComPort.split(",")
- try:
- if len(temComPortInfo) == 5 and Com_Fun.GetHashTableNone(Com_Para.htComPort, temComPortInfo[4]) is None:
- temCD = ComDev(temComPortInfo[0], int(temComPortInfo[1]), int(temComPortInfo[2]), int(temComPortInfo[3]), iIndex)
- temCD.attPortName = temComPortInfo[4]
- Com_Fun.SetHashTable(Com_Para.htComPort, temComPortInfo[4], temCD)
- except Exception as e:
- print("com link error:COM"+temComPortInfo[0]+"==>" + repr(e)+"=>"+str(e.__traceback__.tb_lineno))
- finally:
- Pass
创建串口设备管理类com.zxy.comport.ComDev.py
- #! python3
- # -*- coding: utf-8 -
- '''
- Created on 2017年05月10日
- @author: zxyong 13738196011
- '''
-
- import datetime,threading,time,serial
- from com.zxy.common.Com_Fun import Com_Fun
- from com.zxy.adminlog.UsAdmin_Log import UsAdmin_Log
- from com.zxy.common import Com_Para
- from com.zxy.z_debug import z_debug
-
- #监测数据采集物联网应用--串口设备管理
- class ComDev(z_debug):
- attIndex = 0
- attPort = 0
- attBaudrate = 9600
- attBytesize = 8
- attSerial = None
- #超时时间(秒) 为了验证测试效果,将时间设置为10秒
- attTimeout = 10
- #返回值
- attReturnValue = None
- attPortName = ""
- #特殊插件处理
- attProtocol = ""
- #回发数据
- attSendValue = None
- #线程锁
- attLock = threading.Lock()
-
- def __init__(self, inputPort,inputBaudrate,inputBytesize,inputparity,inputIndex):
- self.attPort = inputPort
- self.attBaudrate = inputBaudrate
- self.attBytesize = inputBytesize
- temParity = "N"
- if str(inputparity) == "0": #无校验
- temParity = "N"
- elif str(inputparity) == "1": #偶校验
- temParity = "E"
- elif str(inputparity) == "2": #奇校验
- temParity = "O"
- elif str(inputparity) == "3":
- temParity = "M"
- elif str(inputparity) == "4":
- temParity = "S"
- self.attSerial = serial.Serial(port=self.attPort,baudrate=self.attBaudrate,bytesize=self.attBytesize,parity=temParity, stopbits=1)
- self.attSerial.timeout = self.attTimeout
- self.attIndex = inputIndex
- self.OpenSeriaPort()
-
- #打开串口
- def OpenSeriaPort(self):
- try:
- if not self.attSerial.isOpen():
- self.attSerial.open()
- t = threading.Thread(target=self.OnDataReceived, name="ComPortTh" + str(self.attIndex))
- t.start()
-
- uL = UsAdmin_Log(Com_Para.ApplicationPath,str("ComPortTh" + str(self.attIndex)))
- uL.SaveFileDaySub("thread")
- print("Open ComPortTh" + str(self.attIndex)+" COM:"+str(self.attSerial.port)+" "+Com_Fun.GetTimeDef()+" lenThreads:"+str(len(threading.enumerate())))
- return True
- except Exception as e:
- if str(type(self)) == "
" : - self.debug_in(self,repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息
- else:
- self.debug_in(repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息
- return False
- finally:
- pass
-
- #关闭串口
- def CloseSeriaPort(self):
- try:
- if not self.attSerial.isOpen():
- self.attSerial.close()
- return True
- except Exception as e:
- if str(type(self)) == "
" : - self.debug_in(self,repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息
- else:
- self.debug_in(repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息
- return False
- finally:
- pass
-
- #发送数据无返回
- def WritePortDataImmed(self,inputByte):
- try:
- if not self.attSerial.isOpen():
- self.OpenSeriaPort()
- if self.attSerial.isOpen() and self.attLock.acquire():
- self.attReturnValue = None
- temNumber = self.attSerial.write(inputByte)
- time.sleep(0.2)
- self.attLock.release()
- return temNumber
- else:
- return 0
- except Exception as e:
- if str(type(self)) == "
" : - self.debug_in(self,repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息
- else:
- self.debug_in(repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息
- return -1
-
- #返回值为字节,带结束符
- def WritePortDataFlag(self,inputByte,EndFlag):
- try:
- if not self.attSerial.isOpen():
- self.OpenSeriaPort()
- if self.attSerial.isOpen() and self.attLock.acquire():
- self.attReturnValue = None
- temNumber = self.attSerial.write(inputByte)
- starttime = datetime.datetime.now()
- endtime = datetime.datetime.now() + datetime.timedelta(seconds=self.attTimeout)
- while (self.attReturnValue is None or self.attReturnValue[len(self.attReturnValue) - len(EndFlag):len(self.attReturnValue)] != EndFlag.encode(Com_Para.U_CODE)) and starttime <= endtime:
- starttime = datetime.datetime.now()
- time.sleep(0.2)
- self.attLock.release()
- return temNumber
- except Exception as e:
- if str(type(self)) == "
" : - self.debug_in(self,repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息
- else:
- self.debug_in(repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息
- return -1
- finally:
- pass
-
- #返回值为字节
- def WritePortData(self,inputByte):
- try:
- if not self.attSerial.isOpen():
- self.OpenSeriaPort()
- if self.attSerial.isOpen() and self.attLock.acquire():
- self.attReturnValue = None
- temNumber = self.attSerial.write(inputByte)
- starttime = datetime.datetime.now()
- endtime = datetime.datetime.now() + datetime.timedelta(seconds=self.attTimeout)
- while self.attReturnValue is None and starttime <= endtime:
- starttime = datetime.datetime.now()
- time.sleep(0.2)
- self.attLock.release()
- return temNumber
- except Exception as e:
- if str(type(self)) == "
" : - self.debug_in(self,repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息
- else:
- self.debug_in(repr(e)+"=>"+str(e.__traceback__.tb_lineno))#打印异常信息
- return -1
- finally:
- pass
-
- #接收数据
- def OnDataReceived(self):
- try:
- while self.attSerial.isOpen():
- temNum = self.attSerial.inWaiting()
- if temNum > 0:
- if self.attReturnValue is None:
- self.attReturnValue = self.attSerial.read(temNum)
- else:
- self.attReturnValue = self.attReturnValue + self.attSerial.read(temNum)
- else:
- time.sleep(1)
- except Exception as e:
- if str(type(self)) == "
" : - self.debug_in(self, repr(e)+"=>"+str(e.__traceback__.tb_lineno))
- else:
- self.debug_in(repr(e)+"=>"+str(e.__traceback__.tb_lineno))
- self.attReturnValue = None
串口通讯测试案例MonitorDataCmd.py主文件中编写:
在该语句下添加

- #串口配置参数
- Com_Para.ComPortList = "COM2,9600,8,0,A;COM4,9600,8,2,B"
- #串口连接初始化
- Init_Page.Start_ComPort()
- #测试串口数据发送和接收
- temCP2 = Com_Fun.GetHashTable(Com_Para.htComPort,"A")#获取串口2对象
- temCP4 = Com_Fun.GetHashTable(Com_Para.htComPort,"B")#获取串口4对象
- temByte1 = ("AABBCCDDVV").encode(Com_Para.U_CODE) #发送字符串转byte[]
- temByte2 = ("11223344KM").encode(Com_Para.U_CODE) #发送字符串转byte[]
-
- print("开始发送串口数据")
- temRec1 = temCP2.WritePortData(temByte1)#往串口2发送数据
- print("串口2发送数据长度:"+str(temRec1))
- strRec = ""
- if temCP2.attReturnValue != None:
- strRec = temCP2.attReturnValue.decode(Com_Para.U_CODE)#收到串口数据
- print("串口2收到数据值:"+strRec)
-
- temRec2 = temCP4.WritePortData(temByte2)#往串口4发送数据
- print("串口3发送数据长度:"+str(temRec2))
- strRec = ""
- if temCP4.attReturnValue != None:
- strRec = temCP4.attReturnValue.decode(Com_Para.U_CODE)#收到串口数据
- print("串口4收到数据值:"+strRec)
串口调试测试结果:
