|
- import socket
- import cv2 as cv
- import sys
- import numpy as np
- from paddle.inference import Config
- from paddle.inference import PrecisionType
- from paddle.inference import create_predictor
- import yaml
- import time
-
- #cap = cv.VideoCapture(0)
- cap = cv.VideoCapture("nvarguscamerasrc \
- !video/x-raw(memory:NVMM), width=640, height=480, format=NV12, framerate=30/1\
- !nvvidconv flip-method=0 ! videoconvert ! video/x-raw, format=BGR ! appsink")
- cap.set(cv.CAP_PROP_FRAME_WIDTH, 512)
- cap.set(cv.CAP_PROP_FRAME_HEIGHT, 512)
- # 图像尺寸相关参数初始化
- ret, img = cap.read()
- im_size = 320
- im_shape = np.array([im_size, im_size]).reshape((1, 2)).astype(np.float32)
- scale_factor = np.array([im_size * 1. / img.shape[0], im_size * 1. / img.shape[1]]).reshape((1, 2)).astype(np.float32)
-
-
- def resize(ximg, target_size):
- """resize to target size"""
- if not isinstance(ximg, np.ndarray):
- raise TypeError('image type is not numpy.')
- im_shape = ximg.shape
- im_scale_x = float(target_size) / float(im_shape[1])
- im_scale_y = float(target_size) / float(im_shape[0])
- ximg = cv.resize(ximg, None, None, fx=im_scale_x, fy=im_scale_y)
- return ximg
-
-
- def normalize(ximg, mean, std):
- ximg = ximg / 255.0
- mean = np.array(mean)[np.newaxis, np.newaxis, :]
- std = np.array(std)[np.newaxis, np.newaxis, :]
- ximg -= mean
- ximg /= std
- return ximg
-
-
- def preprocess(ximg, img_size):
- mean = [0.485, 0.456, 0.406]
- std = [0.229, 0.224, 0.225]
- ximg = resize(ximg, img_size)
- ximg = ximg[:, :, ::-1].astype('float32') # bgr -> rgb
- ximg = normalize(ximg, mean, std)
- ximg = ximg.transpose((2, 0, 1)) # hwc -> chw
- return ximg[np.newaxis, :]
-
-
- def predict_config(model_file, params_file):
- '''
- 函数功能:初始化预测模型predictor
- 函数输入:模型结构文件,模型参数文件
- 函数输出:预测器predictor
- '''
- # 根据预测部署的实际情况,设置Config
- config = Config()
- # 读取模型文件
- config.set_prog_file(model_file)
- config.set_params_file(params_file)
- # Config默认是使用CPU预测,若要使用GPU预测,需要手动开启,设置运行的GPU卡号和分配的初始显存。
- config.enable_use_gpu(500, 0)
- # 可以设置开启IR优化、开启内存优化。
- config.switch_ir_optim()
- config.enable_memory_optim()
- config.enable_tensorrt_engine(workspace_size=1 << 30, precision_mode=PrecisionType.Float32, max_batch_size=1,min_subgraph_size=5, use_static=False, use_calib_mode=False)
- predictor = create_predictor(config)
- return predictor
-
-
- def predict(predictor, img):
- '''
- 函数功能:初始化预测模型predictor
- 函数输入:模型结构文件,模型参数文件
- 函数输出:预测器predictor
- '''
- input_names = predictor.get_input_names()
- for i, name in enumerate(input_names):
- input_tensor = predictor.get_input_handle(name)
- input_tensor.reshape(img[i].shape)
- input_tensor.copy_from_cpu(img[i].copy())
- # 执行Predictor
- predictor.run()
- # 获取输出
- results = []
- # 获取输出
- output_names = predictor.get_output_names()
- for i, name in enumerate(output_names):
- output_tensor = predictor.get_output_handle(name)
- output_data = output_tensor.copy_to_cpu()
- results.append(output_data)
- return results
-
-
- def draw_bbox_image(frame, result, label_list, threshold=0.5):
- for res in result:
- cat_id, score, bbox = res[0], res[1], res[2:]
- if score < threshold:
- continue
- for i in bbox:
- int(i)
- xmin, ymin, xmax, ymax = bbox
- xmin = int(xmin)
- ymin = int(ymin)
- xmax = int(xmax)
- ymax = int(ymax)
-
- ximg = frame[ymin:ymax, xmin:xmax]
-
- cv.rectangle(frame, (xmin, ymin), (xmax, ymax), (255, 0, 255), 2)
- try:
- label_id = label_list[int(cat_id)]
- # #cv2.putText(图像, 文字, (x, y), 字体, 大小, (b, g, r), 宽度)
- cv.putText(frame, label_id, (int(xmin), int(ymin - 2)), cv.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)
- cv.putText(frame, str(round(score, 2)), (int(xmin - 35), int(ymin - 2)), cv.FONT_HERSHEY_SIMPLEX, 0.5,
- (0, 255, 0), 2)
- except KeyError:
- pass
-
- return ximg
-
-
- def socket_client():
- try:
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.connect(('192.168.219.216', 8080)) # 连接服务端
- except socket.error as msg:
- print(msg)
- sys.exit(1)
- # c_sock, c_addr =s.accept()
- print('this is Client')
- while True:
- # get a frame
- ret, frame = cap.read()
- # '.jpg'表示把当前图片img按照jpg格式编码,按照不同格式编码的结果不一样
-
- data = preprocess(frame, im_size)
-
- time_start = time.time()
- # # 预测
- result = predict(predictor, [im_shape, data, scale_factor])
- #print('Time Cost:{}'.format(time.time()-time_start) , "s")
- #
- ximg = draw_bbox_image(frame, result[0], label_list, threshold=0.85)
-
- if ximg is not None:
- img_encode = cv.imencode('.jpg', ximg)[1]
- data_encode = np.array(img_encode)
- str_encode = data_encode.tostring()
- encode_len = str(len(str_encode))
- print('img size : %s' % encode_len)
- try:
- s.send(str_encode) # 发送图片的encode码
- except Exception as e:
- print(e)
- #time.sleep(0.1)
-
- cv.imshow("Client_show", frame)
- cv.waitKey(1)
- if cv.waitKey(1) & 0xFF == ord('q'):
- break
- s.close()
-
-
- if __name__ == "__main__":
- # 从infer_cfg.yml中读出label
- infer_cfg = open('./ppyolo_tiny_650e_coco/infer_cfg.yml')
- data = infer_cfg.read()
- yaml_reader = yaml.safe_load(data)
- label_list = yaml_reader['label_list']
-
- # 配置模型参数
- model_file = "./ppyolo_tiny_650e_coco/model.pdmodel"
- params_file = "./ppyolo_tiny_650e_coco/model.pdiparams"
- # 初始化预测模型
- predictor = predict_config(model_file, params_file)
- socket_client()
|