|
- import time
- from sklearn import metrics
- import torch
- import numpy as np
- import cv2
- import glob
- import os
- import scipy.io as scio
- from models.unet import UNet
-
- class test_dataset:
- def __init__(self, video_folder):
- self.img_h = 256
- self.img_w = 256
- self.clip_length = 5
- self.imgs = glob.glob(video_folder + '/*.jpg')
- self.imgs.sort()
- '''
- img_h: image height
- img_w: image width
- '''
-
- def __len__(self):
- return len(self.imgs) - (self.clip_length - 1) # The first [input_num] frames are unpredictable.
-
- def __getitem__(self, indice):
- video_clips = []
- for frame_id in range(indice, indice + self.clip_length):
- video_clips.append(np_load_frame(self.imgs[frame_id], self.img_h, self.img_w))
-
- video_clips = np.array(video_clips).reshape((-1, self.img_h, self.img_w))
- return video_clips
-
- class Label_loader:
- def __init__(self, dataset, test_data, mat_path, video_folders):
- self.name = dataset
- self.frame_path = test_data
- self.mat_path = mat_path
- self.video_folders = video_folders
- '''
- :param dataset: the name of testing dataset, 'ped2', type=str
- :param test_data: the path of testing dataset, '/home/huangchao/data/ped2/testing/', type=str
- :param mat_path: the path of labels file, '/home/huangchao/data/ped2/ped2.mat', type=str
- :return: labels of dataset
- '''
-
- def __call__(self):
- if self.name == 'shanghaitech':
- gt = self.load_shanghaitech()
- else:
- gt = self.load_ucsd_avenue()
- return gt
-
- def load_ucsd_avenue(self):
- abnormal_events = scio.loadmat(self.mat_path, squeeze_me=True)['gt']
-
- all_gt = []
- for i in range(abnormal_events.shape[0]):
- length = len(os.listdir(self.video_folders[i]))
- sub_video_gt = np.zeros((length,), dtype=np.int8)
-
- one_abnormal = abnormal_events[i]
- if one_abnormal.ndim == 1:
- one_abnormal = one_abnormal.reshape((one_abnormal.shape[0], -1))
-
- for j in range(one_abnormal.shape[1]):
- start = one_abnormal[0, j] - 1
- end = one_abnormal[1, j]
-
- sub_video_gt[start: end] = 1
-
- all_gt.append(sub_video_gt)
-
- return all_gt
-
- def load_shanghaitech(self):
- np_list = glob.glob(f'{self.cfg.data_root + self.name}/frame_masks/*')
- np_list.sort()
-
- gt = []
- for npy in np_list:
- gt.append(np.load(npy))
-
- return gt
-
- def psnr_error(gen_frames, gt_frames):
- """
- Computes the Peak Signal to Noise Ratio error between the generated images and the ground
- truth images.
- @param gen_frames: A tensor of shape [batch_size, 3, height, width]. The frames generated by the
- generator model.
- @param gt_frames: A tensor of shape [batch_size, 3, height, width]. The ground-truth frames for
- each frame in gen_frames.
- @return: A scalar tensor. The mean Peak Signal to Noise Ratio error over each frame in the
- batch.
- """
- shape = list(gen_frames.shape)
- num_pixels = (shape[1] * shape[2] * shape[3])
- gt_frames = (gt_frames + 1.0) / 2.0 # if the generate ouuput is sigmoid output, modify here.
- gen_frames = (gen_frames + 1.0) / 2.0
- square_diff = (gt_frames - gen_frames) ** 2
- batch_errors = 10 * log10(1. / ((1. / num_pixels) * torch.sum(square_diff, [1, 2, 3])))
- return torch.mean(batch_errors)
-
- def log10(t):
- """
- Calculates the base-10 tensorboard_log of each element in t.
- @param t: The tensor from which to calculate the base-10 tensorboard_log.
- @return: A tensor with the base-10 tensorboard_log of each element in t.
- """
- numerator = torch.log(t)
- denominator = torch.log(torch.FloatTensor([10.])).cuda()
- return numerator / denominator
-
- def np_load_frame(filename, resize_h, resize_w):
- img = cv2.imread(filename)
- image_resized = cv2.resize(img, (resize_w, resize_h)).astype('float32')
- image_resized = (image_resized / 127.5) - 1.0 # to -1 ~ 1
- image_resized = np.transpose(image_resized, [2, 0, 1]) # to (C, W, H)
- return image_resized
-
-
- class Ano_Det():
- def __init__(self, dataset, test_data, mat_path, trained_model):
- self.dataset = dataset
- self.test_data = test_data
- self.mat_path = mat_path
- self.trained_model = trained_model
- self.generator = UNet(input_channels=12, output_channel=3).cuda().eval()
- self.generator.load_state_dict(torch.load('weights/' + self.trained_model)['net_g'])
- print(f'The pre-trained generator has been loaded from \'weights/{self.trained_model}\'.\n')
- self.video_folders = os.listdir(self.test_data)
- self.video_folders.sort()
- self.video_folders = [os.path.join(self.test_data, aa) for aa in self.video_folders]
-
- def __call__(self, *args, **kwargs):
- fps = 0
- psnr_group = []
-
- with torch.no_grad():
- for i, folder in enumerate(self.video_folders):
- Dataset = test_dataset(folder)
-
- psnrs = []
- for j, clip in enumerate(Dataset):
- input_np = clip[0:12, :, :]
- target_np = clip[12:15, :, :]
- input_frames = torch.from_numpy(input_np).unsqueeze(0).cuda()
- target_frame = torch.from_numpy(target_np).unsqueeze(0).cuda()
-
- G_frame = self.generator(input_frames)
- test_psnr = psnr_error(G_frame, target_frame).cpu().detach().numpy()
- psnrs.append(float(test_psnr))
-
- torch.cuda.synchronize()
- end = time.time()
- if j > 1: # Compute fps by calculating the time used in one completed iteration, this is more accurate.
- fps = 1 / (end - temp)
- temp = end
- print(f'\rDetecting: [{i + 1:02d}] {j + 1}/{len(Dataset)}, {fps:.2f} fps.', end='')
- psnr_group.append(np.array(psnrs))
- print('\nAll frames were detected, begin to compute AUC.')
- gt_loader = Label_loader(self.dataset, self.test_data, self.mat_path, self.video_folders) # Get gt labels.
- gt = gt_loader()
- assert len(psnr_group) == len(gt), f'Ground truth has {len(gt)} videos, but got {len(psnr_group)} detected videos.'
- scores = np.array([], dtype=np.float32)
- labels = np.array([], dtype=np.int8)
- for i in range(len(psnr_group)):
- distance = psnr_group[i]
- distance -= min(distance) # distance = (distance - min) / (max - min)
- distance /= max(distance)
- scores = np.concatenate((scores, distance), axis=0)
- labels = np.concatenate((labels, gt[i][4:]), axis=0) # Exclude the first 4 unpredictable frames in gt.
- assert scores.shape == labels.shape, \
- f'Ground truth has {labels.shape[0]} frames, but got {scores.shape[0]} detected frames.'
- fpr, tpr, thresholds = metrics.roc_curve(labels, scores, pos_label=0)
- auc = metrics.auc(fpr, tpr)
- print(f'AUC: {auc}\n')
- return auc
-
- if __name__ == '__main__':
- dataset = 'ped2'
- test_data = '/home/huangchao/data/ped2/testing/'
- trained_model = 'ped2_40000.pth'
- mat_path = '/home/huangchao/data/ped2/ped2.mat'
- # Test(dataset, test_data, mat_path, trained_model)
- c = Ano_Det(dataset, test_data, mat_path, trained_model)
- c.__call__()
- print('ok')
-
-
|