目录
① 创建两个线程,主线程为ui线程,子线程用于读取摄像头视频,将处理后的图像帧数据(处理操作可以人为添加)返回到主线程进行可视化;
② 子线程向主线程传递视频帧数据集涉及图像的编码操作,主线程接收子线程的数据时涉及图像的解码操作;

① 编码操作:
- # 图像编码函数
- def Encoder(self, img):
- retval, buffer = cv2.imencode('.jpg', img)
- jpg_as_bytes = base64.b64encode(buffer)
- jpg_as_str = jpg_as_bytes.decode('ascii')
- json_object = json.dumps({'img_str': jpg_as_str})
- return json_object
注:img 为 opencv 读取图像的格式(默认uint8, BGR); 编码后的数据 json_object 可以直接通过signal.emit(json_object) 进行发送。
② 解码操作:
- # 图片解码函数
- def Decoder(self, img_json):
- jpg_as_str = json.loads(img_json)['img_str']
- jpg_as_bytes = jpg_as_str.encode('ascii')
- jpg_original = base64.b64decode(jpg_as_bytes)
- jpg_as_np = np.frombuffer(jpg_original, dtype=np.uint8)
- img = cv2.imdecode(jpg_as_np, flags=1)
- return img
注:输入的 img_json 与编码函数生成的 json_object 格式相同;解码生成的 img 可以通过cv2.imshow("window_name", img) 和 cv2.waitKey(0)进行显示。
- import sys
- import time
-
- import json
- import cv2
- import base64
- import numpy as np
-
- from PyQt5.QtCore import *
- from PyQt5.QtGui import *
- from PyQt5.QtWidgets import *
- from PyQt5 import uic
-
- # 子线程1:打开摄像头,返回当前帧图像到主线程
- class Open_Cam(QThread):
- Open_Cam_signal = pyqtSignal(str) # 接受来自主线程的信号
-
- def __init__(self, main_signal):
- super().__init__()
- # 信号绑定槽函数
- self.Open_Cam_signal.connect(self.Open_Cam)
- self.opencam_complete_signal = main_signal # 返回主线程的信号
-
- def Open_Cam(self, Cam):
- # 将json字符串转换
- Cam_index = json.loads(Cam) # 0
- cap = cv2.VideoCapture(Cam_index)
- fps = cap.get(cv2.CAP_PROP_FPS)
- while True:
- ret, self.frame = cap.read()
- '''
- 这里你可以添加对图像进行处理的操作
- '''
- json_object = self.Encoder(self.frame) # 对图片进行编码
- self.opencam_complete_signal.emit(json_object) # 发送编码到主线程
- cv2.waitKey(int((1 / fps) * 1000))
-
- # 图像编码函数
- def Encoder(self, img):
- retval, buffer = cv2.imencode('.jpg', img)
- jpg_as_bytes = base64.b64encode(buffer)
- jpg_as_str = jpg_as_bytes.decode('ascii')
- json_object = json.dumps({'img_str': jpg_as_str})
- return json_object
-
- def run(self):
- while True: # 让子线程一直运行,等待主线程(ui线程)下发的任务
- print("子线程1正在等待执行....")
- time.sleep(2)
-
- class MyWindow(QWidget):
- opencam_complete_signal = pyqtSignal(str) # 子线程返回摄像头图像到主线程的信号
-
- def __init__(self):
- super().__init__()
- self.init_ui()
- self.cam_idx = 0
-
- def init_ui(self):
- self.ui = uic.loadUi("./Cam.ui") # 加载由Qt Designer设计的ui文件
-
- # 加载控件
- self.open_cam_btn = self.ui.pushButton
- self.cam_view = self.ui.graphicsView
-
- # 绑定槽函数
- self.open_cam_btn.clicked.connect(self.open_cam)
- self.opencam_complete_signal.connect(self.view_cam)
-
- self.Opencam_Thread = Open_Cam(self.opencam_complete_signal) # 子线程1
- self.Opencam_Thread.start() # 子线程1运行,等待下发任务
-
- def open_cam(self): # 给子线程1发送打开摄像头的信号
- self.Opencam_Thread.Open_Cam_signal.emit(json.dumps(self.cam_idx))
-
- def view_cam(self, img_json): # 接受来自子线程1返回的摄像头数据
- self.img_json = img_json
- img = self.Decoder(self.img_json)
- frame = QImage(img, img.shape[1], img.shape[0], img.strides[0], QImage.Format_RGB888).rgbSwapped()
- pix = QPixmap.fromImage(frame)
- item = QGraphicsPixmapItem(pix) # 创建像素图元
- scene = QGraphicsScene() # 创建场景
- scene.addItem(item)
- self.cam_view.setScene(scene) # 将场景添加至视图
- self.cam_view.fitInView(item) # 自适应大小
-
- # 图片解码函数
- def Decoder(self, img_json):
- jpg_as_str = json.loads(img_json)['img_str']
- jpg_as_bytes = jpg_as_str.encode('ascii')
- jpg_original = base64.b64decode(jpg_as_bytes)
- jpg_as_np = np.frombuffer(jpg_original, dtype=np.uint8)
- img = cv2.imdecode(jpg_as_np, flags=1)
- return img
-
- if __name__ == '__main__':
- app = QApplication(sys.argv) # 创建对象
-
- w = MyWindow()
- # 展示窗口
- w.ui.show()
-
- # 程序进行循环等待状态
- app.exec_()

上述代码存在当关闭ui窗口时,子线程不会自动结束的问题,后续将补充如何解决这个问题的方案。
1130补充:上述问题的原因在于我把读视频的while true循环写在了run函数外,为了避免这个问题可以采用定时器读取视频流的方法,可以参考博主的一个学习笔记。