|
- # Maki Hojo Ryu Enami Marina Matsumoto
-
- import cv2
- import numpy as np
- from glob import glob
- from tqdm import tqdm
- from numpy.linalg import norm
- from facex import FaceX
- from facex.model_zoo import get_model
- from facex.face_align import norm_crop
-
- def warp_face_by_bounding_box(image, bounding_box, crop_size):
- source_points = np.array([[bounding_box[0], bounding_box[1]], [bounding_box[2], bounding_box[1]], [bounding_box[0], bounding_box[3]]], dtype = np.float32)
- target_points = np.array([[ 0, 0 ], [ crop_size[0], 0 ], [ 0, crop_size[1] ]], dtype = np.float32)
- affine_matrix = cv2.getAffineTransform(source_points, target_points)
- if bounding_box[2] - bounding_box[0] > crop_size[0] or bounding_box[3] - bounding_box[1] > crop_size[1]:
- interpolation_method = cv2.INTER_AREA
- else:
- interpolation_method = cv2.INTER_LINEAR
- crop_vision_frame = cv2.warpAffine(image, affine_matrix, crop_size, flags = interpolation_method)
- return crop_vision_frame, affine_matrix
-
- class FaceRetrieval:
- @staticmethod
- def compute_sim(feat1: np.ndarray, feat2: np.ndarray):
- feat1 = feat1.ravel()
- feat2 = feat2.ravel()
- sim = np.dot(feat1, feat2)
- return sim
-
- def __init__(self, thrs: float = 0.6, default = ""):
- self.names = []
- self.embs = []
- self.default = default
- self.thrs = thrs
-
- def add(self, name, emb):
- self.names.append(name)
- emb = emb / norm(emb)
- self.embs.append(emb)
-
- def find(self, emb):
- if len(self.embs) <= 0:
- return self.default, 0
-
- embs = np.stack(self.embs, axis=0)
- emb = emb / norm(emb)
- sim = np.dot(emb, embs.T)
- i = np.argmax(sim)
- score = sim[i]
- if score < self.thrs:
- return self.default, 0
- return self.names[i], score
-
- def decode_fourcc(v):
- v = int(v)
- return "".join([chr((v >> 8 * i) & 0xFF) for i in range(4)])
-
- class VideoProcessor:
- def __init__(self):
- self.facex = FaceX(
- providers=[
- 'CUDAExecutionProvider',
- # 'CPUExecutionProvider',
- ],
- tasks=[
- "detection",
- # "landmark_2d_106",
- "recognition",
- ]
- )
-
- self.facex.prepare(ctx_id=0, det_size=(640, 640))
-
- self.facedb = FaceRetrieval(thrs=0.4)
- self.fourcc_suffix = {
- "XVID": ".avi",
- "MP4V": ".MP4"
- }
-
- def add(self, im: np.ndarray, name: str):
- emb = self.facex.get(im)[0].embedding
- self.facedb.add(name, emb)
-
- def process_frame(self, frame: np.ndarray, idx: int):
- faces = self.facex.get(frame)
- # height, width = frame.shape[:2]
- font = cv2.FONT_HERSHEY_SIMPLEX
- for i, face in enumerate(faces):
- x1, y1, x2, y2 = np.round(face.bbox).astype(int)
- frame = cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 3, lineType=cv2.LINE_AA)
- name, score = self.facedb.find(face.embedding)
- if name != self.facedb.default:
- frame = cv2.putText(frame, name, (x1, y1 - 5), font, 1.0, (0, 255, 0), 2)
- # frame = cv2.putText(frame, "NO", (x1, y1 - 5), font, 1.0, (0, 255, 0), 2)
-
- return frame
-
-
- def read_video(self, video_path):
- capture = cv2.VideoCapture(video_path)
- assert capture.isOpened(), f"open {video_path} failed!"
-
- fps = int(capture.get(cv2.CAP_PROP_FPS))
- resolution = (int(capture.get(cv2.CAP_PROP_FRAME_WIDTH)), int(capture.get(cv2.CAP_PROP_FRAME_HEIGHT)))
- num_frames = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
- fourcc = decode_fourcc(capture.get(cv2.CAP_PROP_FOURCC))
- print("fps:", fps)
- print("resolution:", resolution)
- print("num frames:", num_frames)
- print("fourcc: ", fourcc)
-
- cnt = 0
- frames = []
- pbar = tqdm(total=num_frames, ncols=100, desc="Read")
- while True:
- _, frame = capture.read()
- if frame is None:
- break
- cnt += 1
- pbar.update(1)
- frames.append(frame)
- pbar.close()
-
- capture.release()
-
- return frames, resolution, fps, fourcc
-
- def write_video(self, frames, video_path, resolution, fps, fourcc):
- writer = cv2.VideoWriter(video_path, cv2.VideoWriter_fourcc(*fourcc), fps, resolution)
- for frame in tqdm(frames, ncols=100, desc="Write"):
- writer.write(frame)
- writer.release()
-
- def __call__(self, video_path: str, save_path: str):
- frames, resolution, fps, fourcc = self.read_video(video_path)
- frames = [self.process_frame(frame, idx) for idx, frame in tqdm(enumerate(frames), total=len(frames), ncols=100, desc="Process")]
- fourcc = "MP4V"
- # fourcc = "avc1"
- fourcc = "XVID"
- suffix = self.fourcc_suffix[fourcc]
- self.write_video(frames, save_path+suffix, resolution, fps, fourcc)
-
-
- video_processor = VideoProcessor()
- video_processor.add(cv2.imread("/root/1.jpg"), "Maki Hojo")
- video_processor.add(cv2.imread("/root/2.jpg"), "Ryu Enami")
- video_processor.add(cv2.imread("/root/3.jpg"), "Marina Matsumoto")
-
- results = video_processor("/root/010116_220.1080p.mp4", "/root/output.1080p")
-
|