|
- import datetime
- import os
- import sys
-
- from pathlib import Path
- import cv2
- from PIL import Image, ImageDraw, ImageFont
-
- from PyQt5.QtWidgets import QMainWindow, QFileDialog, QWidget
- from PyQt5 import QtCore, QtGui
-
- from PyQt5.QtWidgets import QApplication
-
- from PyQt5.QtCore import pyqtSignal, QThread
- from PyQt5.QtGui import QImage, QPixmap
- from mask import Ui_MainWindow
- import time
- import numpy as np
- import torch
- import torch.backends.cudnn as cudnn
-
- FILE = Path(__file__).absolute()
- sys.path.append(FILE.parents[0].as_posix()) # add yolov5/ to path
-
- from models.experimental import attempt_load
- from utils.datasets import LoadImages, LoadWebcam
- # LoadWebcam 的最后一个返回值改为 self.cap
- from utils.general import check_img_size, check_requirements, check_imshow, colorstr, non_max_suppression, \
- apply_classifier, scale_coords, xyxy2xywh, strip_optimizer, set_logging, increment_path, save_one_box
- from utils.plots import colors, plot_one_box
- from utils.torch_utils import select_device, load_classifier, time_sync
-
- from tishiyin import MP3Player
-
-
- class DetThread(QThread):
- send_img = pyqtSignal(np.ndarray)
- send_statistic = pyqtSignal(dict)
-
- def __init__(self):
- super(DetThread, self).__init__()
- self.weights = './weights/mask_with_CBAMC3.pt'
- self.source = '0'
- self.conf_thres = 0.5
- self.show_falg = 0
- self.sound1 = MP3Player('提示音/佩戴口罩.mp3')
- self.sound2 = MP3Player('提示音/正确佩戴.mp3')
- self.sound_flag = False
- self.sound_time_start = 100000000000
- self.vid_cap = None
-
- @torch.no_grad()
- def run(self,
- imgsz=640, # inference size (pixels)
- iou_thres=0.45, # NMS IOU threshold
- max_det=1000, # maximum detections per image
- device='', # cuda device, i.e. 0 or 0,1,2,3 or cpu
- view_img=True, # show results
- save_txt=False, # save results to *.txt
- save_conf=False, # save confidences in --save-txt labels
- save_crop=False, # save cropped prediction boxes
- nosave=False, # do not save images/videos
- classes=None, # filter by class: --class 0, or --class 0 2 3
- agnostic_nms=True, # class-agnostic NMS
- augment=False, # augmented inference
- visualize=False, # visualize features
- update=False, # update all models
- project='runs/detect', # save results to project/name
- name='exp', # save results to project/name
- exist_ok=False, # existing project/name ok, do not increment
- line_thickness=3, # bounding box thickness (pixels)
- hide_labels=False, # hide labels
- hide_conf=False, # hide confidences
- half=False, # use FP16 half-precision inference
- ):
- print('run')
-
- # Initialize
- device = '0' if torch.cuda.is_available() else device
- device = select_device(device)
- half &= device.type != 'cpu' # half precision only supported on CUDA
-
- # Load model
- model = attempt_load(self.weights, map_location=device) # load FP32 model
- num_params = 0
- for param in model.parameters():
- num_params += param.numel()
- stride = int(model.stride.max()) # model stride
- imgsz = check_img_size(imgsz, s=stride) # check image size
- names = model.module.names if hasattr(model, 'module') else model.names # get class names
- if half:
- model.half() # to FP16
-
- # Dataloader
- if self.source.isnumeric():
- cudnn.benchmark = True # set True to speed up constant image size inference
- dataset = LoadWebcam(self.source, img_size=imgsz, stride=stride)
- bs = len(dataset) # batch_size
- else:
- dataset = LoadImages(self.source, img_size=imgsz, stride=stride)
- # Run inference
- if device.type != 'cpu':
- model(torch.zeros(1, 3, imgsz, imgsz).to(device).type_as(next(model.parameters()))) # run once
- for path, img, im0s, vid_cap in dataset:
- self.vid_cap = vid_cap
- self.show_falg = 1
- statistic_dic = {name: 0 for name in names}
- img = torch.from_numpy(img).to(device)
- img = img.half() if half else img.float() # uint8 to fp16/32
- img /= 255.0 # 0 - 255 to 0.0 - 1.0
- if img.ndimension() == 3:
- img = img.unsqueeze(0)
-
- pred = model(img, augment=augment)[0]
- # print('pred', pred, pred.shape)
- # Apply NMS
- pred = non_max_suppression(pred, self.conf_thres, iou_thres, classes, agnostic_nms, max_det=max_det)
-
- # Process detections
- for i, det in enumerate(pred): # detections per image
- self.play_sound(det)
-
- im0 = im0s.copy()
- if len(det):
- # Rescale boxes from img_size to im0 size
- det[:, :4] = scale_coords(img.shape[2:], det[:, :4], im0.shape).round()
-
- # Write results
- for *xyxy, conf, cls in reversed(det):
- c = int(cls) # integer class
- statistic_dic[names[c]] += 1
- label = None if hide_labels else (names[c] if hide_conf else f'{names[c]} {conf:.2f}')
- plot_one_box(xyxy, im0, label=label, color=colors(c, True), line_thickness=line_thickness)
-
- time.sleep(1 / 40)
- self.send_img.emit(im0)
- # self.send_statistic.emit(statistic_dic)
-
- # 播放语音
- def play_sound(self, det):
- # Process detections
- if self.sound_flag and len(det) != 0:
- for det_ in det:
- if det_[-1] == 2 and det_[-2] > 0.7:
- if time.time() - self.sound_time_start < 0:
- self.sound_time_start = time.time()
- self.sound2.playSound()
- # 5秒一次语音
- elif time.time() - self.sound_time_start > 5:
- self.sound_time_start = 100000000000
- elif det_[-1] == 0 and det_[-2] > 0.7:
- if time.time() - self.sound_time_start < 0:
- self.sound_time_start = time.time()
- self.sound1.playSound()
-
- elif time.time() - self.sound_time_start > 5:
- self.sound_time_start = 100000000000
-
-
- class mask_ui(QMainWindow, Ui_MainWindow):
- def __init__(self):
- super(mask_ui, self).__init__()
- self.setupUi(self)
-
- # self.model = './yolov5s.pt'
- self.det_thread = DetThread()
- self.det_thread.source = None
- self.det_thread.send_img.connect(lambda x: self.show_image(x, self.VF))
- self.det_thread.send_statistic.connect(self.show_statistic)
-
- self.flag = False
- self.jiemian = "./icon/界面.jpg"
- self.VF.setPixmap(QPixmap(self.jiemian))
- self.VF.setScaledContents(True)
-
- self.video_save_path = 'save_video'
- if not os.path.exists(self.video_save_path):
- os.mkdir(self.video_save_path)
- self.vid_writer = None
-
- # 按钮控制摄像头
- self.pushButton.clicked.connect(self.camera)
- # 按钮2控制播报
- self.pushButton_2.clicked.connect(self.set_sound_flag)
- # 按钮3控制视频录制
- self.pushButton_3.clicked.connect(self.save_video)
-
- self.zhixindu_bar.valueChanged.connect(lambda: self.conf_change(self.zhixindu_bar))
- self.spinBox.valueChanged.connect(lambda: self.conf_change(self.spinBox))
- # self.pushButton_2.clicked.connect(self.stop_camera)
-
- self.timer1 = QtCore.QTimer()
- self.timer1.start(7000)
- self.timer1.timeout.connect(self.show_camera_message)
-
- self.timer2 = QtCore.QTimer()
- self.timer2.start(100)
- self.timer2.timeout.connect(self.adaption_size)
-
- self.window_size = {'window': (self.size().width(), self.size().height()),
- 'pushButton': (self.pushButton.size().width(), self.pushButton.size().height()),
- 'pushButton_2': (self.pushButton_2.size().width(), self.pushButton_2.size().height()),
- 'pushButton_3': (self.pushButton_3.size().width(), self.pushButton_3.size().height()),
- 'spinBox': (self.spinBox.size().width(), self.spinBox.size().height()),
- 'frame_3': (self.frame_3.size().width(), self.frame_3.size().height()),
- 'zhixindu_bar': (self.zhixindu_bar.size().width(), self.zhixindu_bar.size().height()),
- 'VF': (self.VF.size().width(), self.VF.size().height()),
- 'groupBox': (self.groupBox.size().width(), self.groupBox.size().height()),}
-
- def show_camera_message(self):
- if self.det_thread.show_falg == 1:
- if self.vid_writer:
- self.statusbar.showMessage('正在录制中。。。')
- else:
- self.statusbar.showMessage('正在检测中。。。')
-
- def camera(self):
- if self.pushButton.text() == 'Start' and self.flag == False:
- self.pushButton.setCheckable(False)
- self.det_thread.source = '0'
- self.det_thread.start()
- self.statusbar.showMessage('模型加载中。。。')
- self.pushButton.setCheckable(False)
- else:
- self.det_thread.terminate()
- self.det_thread.show_falg = 0
- self.statusbar.showMessage('已停止检测')
- self.VF.setPixmap(QPixmap(self.jiemian))
- self.flag = False
- self.pushButton.setText('Start')
- self.det_thread.source = None
- self.pushButton.setCheckable(False)
- if hasattr(self.det_thread, 'vid_cap'):
- self.det_thread.vid_cap.release()
-
- def show_image(self, img_src, label):
- try:
- ih, iw, _ = img_src.shape
- w = label.geometry().width()
- h = label.geometry().height()
- # 保持纵横比
- # 找出长边
- if iw > ih:
- scal = w / iw
- nw = w
- nh = int(scal * ih)
- img_src_ = cv2.resize(img_src, (nw, nh))
-
- else:
- scal = h / ih
- nw = int(scal * iw)
- nh = h
- img_src_ = cv2.resize(img_src, (nw, nh))
-
- Text = f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
- img_src_ = self.show_Text(img_src_, Text, '微软雅黑Bold.ttf', 20, 10)
-
- if self.vid_writer:
- # 保存视频
- self.vid_writer.write(cv2.resize(img_src_, (640, 480)))
-
- frame = cv2.cvtColor(img_src_, cv2.COLOR_BGR2RGB)
- img = QImage(frame.data, frame.shape[1], frame.shape[0], frame.shape[2] * frame.shape[1],
- QImage.Format_RGB888)
-
- # 绘制时间
- # Text = '下次签到时间:'
- # img = self.show_Text(self.img, Text, '微软雅黑Bold.ttf', 20, 20)
-
- label.setPixmap(QPixmap.fromImage(img))
- self.flag = True
- self.pushButton.setText('Stop')
- except Exception as e:
- print(repr(e))
-
- def set_video_name_and_path(self):
- print('sss', self.size().height())
- print(self.pushButton_3.size())
- # 获取当前系统时间,作为img和video的文件名
- now = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime(time.time()))
- # if vid_cap: # video
- if self.det_thread.vid_cap:
- fps = self.det_thread.vid_cap.get(cv2.CAP_PROP_FPS)
- w = int(self.det_thread.vid_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
- h = int(self.det_thread.vid_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
- # 视频检测结果存储位置
- save_path = self.video_save_path + '/' + now + '.mp4'
- self.vid_writer = cv2.VideoWriter(save_path, cv2.VideoWriter_fourcc(*'mp4v'), 15, (w, h))
- print(w, h, fps)
- # return fps, w, h, save_path
-
- def show_Text(self, frame, Text, typeface, left, bottom): # 显示文字
- pil_img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # cv2和PIL中颜色的hex码的储存顺序不同,需转RGB模式
- pilimg = Image.fromarray(pil_img) # Image.fromarray()将数组类型转成图片格式,与np.array()相反
- draw = ImageDraw.Draw(pilimg) # PIL图片上打印汉字
- font = ImageFont.truetype(typeface, 20, encoding="utf-8")
- draw.text((left - 10, bottom - 10), Text, (50, 50, 50), font=font)
- cv2img = cv2.cvtColor(np.array(pilimg), cv2.COLOR_RGB2BGR) # 将图片转成cv2.imshow()可以显示的数组格式
- return cv2img
-
- # 更改置信度
- def conf_change(self, method):
- if method == self.zhixindu_bar:
- self.spinBox.setValue(self.zhixindu_bar.value())
- if method == self.spinBox:
- self.zhixindu_bar.setValue(self.spinBox.value())
- self.det_thread.conf_thres = self.zhixindu_bar.value() / 100
- self.statusbar.showMessage("置信度已更改为:" + str(self.det_thread.conf_thres))
-
- def closeEvent(self, event):
- self.det_thread.terminate()
- if hasattr(self.det_thread, 'vid_cap') and self.det_thread.vid_cap:
- self.det_thread.vid_cap.release()
- if self.vid_writer:
- self.vid_writer.release()
- super().closeEvent(event)
-
- def show_statistic(self, statistic_dic):
- try:
- # self.listWidget.clear()
- statistic_dic = sorted(statistic_dic.items(), key=lambda x: x[1], reverse=True)
- statistic_dic = [i for i in statistic_dic if i[1] > 0]
- results = [str(i[0]) + ':' + str(i[1]) for i in statistic_dic]
- self.listWidget.addItems(results)
-
- except Exception as e:
- print(repr(e))
-
- def set_sound_flag(self):
- if self.pushButton_2.text() == 'playSound':
- self.det_thread.sound_flag = True
- self.pushButton_2.setText('stopPlay')
- else:
- self.det_thread.sound_flag = False
- self.pushButton_2.setText('playSound')
-
- def save_video(self):
- if self.pushButton_3.text() == 'recording':
- if self.flag:
- if self.vid_writer == None:
- self.set_video_name_and_path()
- self.pushButton_3.setText('stopRecord')
- else:
- self.statusbar.showMessage('请先启动检测!')
- else:
- self.vid_writer.release()
- self.statusbar.showMessage(f"录制视频已保存在:{os.path.abspath('.')}\\{self.video_save_path}中!")
- self.vid_writer = None
- self.det_thread.sound_flag = False
- self.pushButton_3.setText('recording')
-
- def adaption_size(self):
- w, h = self.size().width(), self.size().height()
- w_change, h_change = w / self.window_size['window'][0], h / self.window_size['window'][1]
- self.pushButton.setFixedSize(int(self.window_size['pushButton'][0] * w_change),
- int(self.window_size['pushButton'][1] * h_change), )
- self.pushButton_2.setFixedSize(int(self.window_size['pushButton_2'][0] * w_change),
- int(self.window_size['pushButton_2'][1] * h_change), )
-
- self.pushButton_2.move(self.pushButton.pos().x() + self.pushButton.size().width() - 3,
- self.pushButton.pos().y())
-
- self.pushButton_3.setFixedSize(int(self.window_size['pushButton_3'][0] * w_change),
- int(self.window_size['pushButton_3'][1] * h_change), )
- self.pushButton_3.move(self.pushButton_2.pos().x() + self.pushButton_2.size().width() - 3,
- self.pushButton_2.pos().y())
-
- self.spinBox.setFixedSize(int(self.window_size['spinBox'][0] * w_change),
- int(self.window_size['spinBox'][1] * h_change), )
-
- self.frame_3.setFixedSize(int(self.window_size['frame_3'][0] * w_change),
- int(self.window_size['frame_3'][1] * h_change), )
-
- self.zhixindu_bar.setFixedSize(int(self.window_size['zhixindu_bar'][0] * w_change),
- int(self.window_size['zhixindu_bar'][1] * h_change), )
-
- self.zhixindu_bar.move(self.spinBox.pos().x() + self.spinBox.size().width() + 5,
- self.spinBox.pos().y())
-
- self.VF.setFixedSize(int(self.window_size['VF'][0] * w_change),
- int(self.window_size['VF'][1] * h_change), )
-
- self.groupBox.setFixedSize(int(self.window_size['groupBox'][0] * w_change),
- int(self.window_size['groupBox'][1] * h_change), )
-
- if __name__ == '__main__':
- app = QApplication(sys.argv)
- my_pyqt_form = mask_ui()
-
- my_pyqt_form.show()
- sys.exit(app.exec_())
|