|
- import os
- import os.path as osp
- from glob import glob
- from collections import defaultdict
- import numpy as np
- import torch
- from tqdm import tqdm
- from torch import nn
- import torchvision
- from PIL import Image
- import time
- import torchvision.transforms.functional as TF
- from torch.utils.data import Dataset, DataLoader
- from torchvision.transforms import InterpolationMode
- import argparse
- from torchvision.utils import make_grid
- from torch.autograd import Variable
- import matplotlib
- import matplotlib.cm as cm
- import scipy.signal as signal
-
- from torch.utils.tensorboard import SummaryWriter
- import warnings
-
- warnings.simplefilter("ignore")
-
- device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
-
- from scipy import stats
- import pandas as pd
-
- class MyDataset(Dataset):
- def __init__(self, label = 'test', scale = 100):
- super(MyDataset, self).__init__()
- self.depth_list = []
- self.mask_list = []
- self.num = 0
- self.scale = scale
-
- indoors_data = pd.read_csv('./data_list/val_indoors.csv',header=None)
- depth_df_1 = indoors_data.iloc[:,1]
- mask_df_1 = indoors_data.iloc[:,2]
- outdoor_data = pd.read_csv('./data_list/val_outdoor.csv',header=None)
- depth_df_2 = outdoor_data.iloc[:,1]
- mask_df_2 = outdoor_data.iloc[:,2]
-
-
- if label != 'test':
- print('error label != test !!!')
- else:
- for i in range(len(depth_df_1)):
- self.depth_list.append(depth_df_1[i])
- self.mask_list.append(mask_df_1[i])
- self.num = self.num +1
- for i in range(len(depth_df_2)):
- self.depth_list.append(depth_df_2[i])
- self.mask_list.append(mask_df_2[i])
- self.num = self.num +1
-
- print('finish loading', label, 'dataset:', self.num)
-
- def __len__(self):
- return self.num
-
- def __getitem__(self, idx):
- depth_path = self.depth_list[idx]
- mask_path = self.mask_list[idx]
-
- depth = np.load(depth_path).astype(np.float32).transpose(2, 0, 1) /self.scale # [1, H, W]
-
- depth_1 = ((np.floor(self.scale*1000*depth)%512)/(self.scale*1000)).astype(np.float32)
- depth_2 = ((np.floor(self.scale *1000*depth/512))/(self.scale *1000)).astype(np.float32)
-
- mask = np.load(mask_path).astype(np.float32) # [H, W]
- H, W = mask.shape
- mask = mask.reshape(1, H, W) # [1, H, W]
-
- # 去掉mask为0的区域
- depth_1[mask==0] =0
- depth_2[mask==0] =0
-
- img = np.zeros((2, H, W))
- img[0:1, :, :] = depth_1
- img[1:2, :, :] = depth_2
- img = img.astype(np.float32)
-
- img_tensor = torch.from_numpy(img)
- return img_tensor
-
-
- if __name__ == "__main__":
- parser = argparse.ArgumentParser()
- parser.add_argument("--scale", type=float, default=1, help="Scale to modify the depth")
- parser.add_argument("--batch_size", type=int, default=1, help="Size of the batches")
- parser.add_argument("--crop_width", type=int, default=1024, help="crop_width")
- parser.add_argument("--crop_height", type=int, default=768, help="crop_height")
- parser.add_argument("--resume_1", type=str,
- default='./model/2021_12_25__14_54_34/compression_model_39_3.64220591.pkl',
- help="Dir path to load trained compression_model")
- parser.add_argument("--qstep_residual", type=float, default=1000, help="Residual precision")
-
- opt = parser.parse_args()
- scale = opt.scale
- batch_size = opt.batch_size
- qstep_residual = opt.qstep_residual
- resume_1 = opt.resume_1
- crop_width = opt.crop_width
- crop_height = opt.crop_height
-
- if batch_size!=1:
- print('batch_size!=1')
- import sys
- sys.exit(0)
-
- test_data = MyDataset(label='test', scale=scale)
- test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False,num_workers=16)
-
- compression_model = torch.load(resume_1, map_location=torch.device('cpu')).to(device)
-
- currentBPP = 0
- e1BPP = 0
- e2BPP = 0
- residualBPP = 0
-
- test_steps = 0
-
- for step, batch_data in enumerate(test_loader):
- # batch_x [batchsize, 1, 1024, 768]
- batch_x = batch_data
- # 需要将batch_x裁剪成[batchsize, 1, crop_height, crop_width]
- h_num = batch_x.size()[2]//crop_height
- w_num = batch_x.size()[3]//crop_width
-
- print( " h_num:", h_num, "w_num:",w_num,)
-
- start = time.time()
-
- for i in range(w_num):
- for j in range(h_num):
- input_x = batch_x[:, :, j*crop_height:(j+1)*crop_height, i*crop_width:(i+1)*crop_width]
-
- num_pixels = input_x.size()[0]*input_x.size()[2]*input_x.size()[3]
- input_x = Variable(input_x, requires_grad=False).to(device)
-
- with torch.no_grad():
- lossyDepth, residual, residual_esti_1, xp1, xp2, xp_residual_1 = compression_model(input_x,False)
-
-
- test_bpp1 = torch.sum(torch.log(xp1)) / (-np.log(2) * num_pixels)
- test_bpp2 = torch.sum(torch.log(xp2)) / (-np.log(2) * num_pixels)
- test_bpp1_residual = torch.sum(torch.log(xp_residual_1)) / (-np.log(2) * num_pixels)
-
- l_rec = test_bpp1 +test_bpp2 + test_bpp1_residual
-
- print('test step: %d i: %d j: %d bpp_total: %.4f e1_loss: %.4f e2_loss: %.4f e1_loss_residual: %.4f'
- %(step, i, j, l_rec.item(),test_bpp1.item(),test_bpp2.item(), test_bpp1_residual.item()))
-
- test_steps = test_steps +1
- currentBPP = currentBPP + l_rec.item()
- e1BPP = e1BPP + test_bpp1.item()
- e2BPP = e2BPP + test_bpp2.item()
- residualBPP = residualBPP + test_bpp1_residual.item()
-
- end = time.time()
- print('time: ', (end-start)/(w_num*h_num))
-
- currentBPP = currentBPP/test_steps
- e1BPP = e1BPP/test_steps
- e2BPP = e2BPP/test_steps
- residualBPP = residualBPP/test_steps
-
- print('total average test_bpp: %.4f e1_bpp: %.4f e2_bpp: %.4f residual_bpp: %.4f ' %(currentBPP, e1BPP, e2BPP, residualBPP))
|