|
- import os
- import os.path as osp
- from glob import glob
- import pandas as pd
- from collections import defaultdict
- import numpy as np
- import torch
- from tqdm import tqdm
- from torch import nn
- import torchvision
- from PIL import Image
- import time
- import torchvision.transforms.functional as TF
- from torch.utils.data import Dataset, DataLoader
- from torch.nn.utils import clip_grad_norm_
- from torchvision.transforms import InterpolationMode
- import argparse
- from torchvision.utils import make_grid
- from torch.autograd import Variable
- import scipy.signal as signal
- import matplotlib
- import matplotlib.cm as cm
- import io
- import gzip
- import bz2
- import lzma
- import PIL.Image as pimg
- import zlib
-
-
- from torch.utils.tensorboard import SummaryWriter
- import warnings
-
- warnings.simplefilter("ignore")
-
- torch.autograd.set_detect_anomaly(True)
-
- device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
-
- def colorize(tensor, vmin=0, vmax=0.4, cmap="turbo"):
- assert tensor.ndim == 2
- normalizer = matplotlib.colors.Normalize(vmin=vmin, vmax=vmax)
- mapper = cm.ScalarMappable(norm=normalizer, cmap=cmap)
- tensor = mapper.to_rgba(tensor)[..., :3]
- return tensor
-
- # save images to tensorboard
- def save_img(writer, tensor, tag, step, vmin=0, vmax =0.4, color=True):
- grid = make_grid(tensor.detach(), nrow=1)
- grid = grid.cpu().numpy() # CHW
- if color:
- grid = grid[0] # HW
- grid = colorize(grid, vmin, vmax).transpose(2, 0, 1) # CHW
- writer.add_image(tag, grid, step)
-
- class MyDataset(Dataset):
- def __init__(self, label = 'test', scale = 100):
- super(MyDataset, self).__init__()
- self.depth_list = []
- self.mask_list = []
- self.num = 0
- self.scale = scale
-
- indoors_data = pd.read_csv('./data_list/val_indoors.csv',header=None)
- depth_df_1 = indoors_data.iloc[:,1]
- mask_df_1 = indoors_data.iloc[:,2]
-
- outdoor_data = pd.read_csv('./data_list/val_outdoor.csv',header=None)
- depth_df_2 = outdoor_data.iloc[:,1]
- mask_df_2 = outdoor_data.iloc[:,2]
-
-
- if label != 'test':
- print('error label != test !!!')
- else:
- for i in range(len(depth_df_1)):
- self.depth_list.append(depth_df_1[i])
- self.mask_list.append(mask_df_1[i])
- self.num = self.num +1
- for i in range(len(depth_df_2)):
- self.depth_list.append(depth_df_2[i])
- self.mask_list.append(mask_df_2[i])
- self.num = self.num +1
-
- print('finish loading', label, 'dataset:', self.num)
-
- def __len__(self):
- return self.num
-
- def __getitem__(self, idx):
- depth_path = self.depth_list[idx]
- mask_path = self.mask_list[idx]
-
- depth = np.load(depth_path).astype(np.float32).transpose(2, 0, 1) /self.scale # [1, H, W]
- mask = np.load(mask_path).astype(np.float32) # [H, W]
- H, W = mask.shape
- mask = mask.reshape(1, H, W) # [1, H, W]
-
- # 去掉mask为0的区域
- depth[mask==0] =0
-
- depth_tensor = torch.from_numpy(depth)
- mask_tensor = torch.from_numpy(mask)
-
-
- return [depth_tensor, mask_tensor]
-
-
-
- def gzip_compress(images):
- return len(gzip.compress(images.tobytes()))*8
-
- def bz2_compress(images):
- return len(bz2.compress(images.tobytes()))*8
-
- def lzma_compress(images):
- return len(lzma.compress(images.tobytes()))*8
-
- def zlib_compress(images):
- return len(zlib.compress(images.tobytes()))*8
-
- if __name__ == "__main__":
- parser = argparse.ArgumentParser()
- parser.add_argument("--scale", type=float, default=100, help="Scale to modify the depth")
- parser.add_argument("--batch_size", type=int, default=1, help="Size of the batches")
-
- parser.add_argument("--qstep_residual", type=float, default=1000, help="Residual precision")
- parser.add_argument("--log_path", type=str, default='./benchmark_result/', help="Dir path to save log")
-
- opt = parser.parse_args()
- scale = opt.scale
- batch_size = opt.batch_size
-
- qstep_residual = opt.qstep_residual
-
-
- if batch_size!=1:
- print('batch_size!=1')
- import sys
- sys.exit(0)
-
- os.makedirs(opt.log_path, exist_ok=True)
- log_path = opt.log_path + 'benchmark_result.txt'
- if not os.path.exists(log_path):
- os.system(r"touch {}".format(log_path))
-
-
- test_data = MyDataset(label='test', scale=scale)
- test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False,num_workers=16)
-
- gzip_bits_avg = 0
- bz2_bits_avg = 0
- lzma_bits_avg = 0
- zlib_bits_avg = 0
-
-
- for batch_data in tqdm(test_loader):
- # for step, batch_data in enumerate(test_loader):
- # batch_x [batchsize, 1, 1024, 768]
- # mask [batchsize, 1, 1024, 768]
- [batch_x, mask] = batch_data
-
- # 还原batch_x的精度
- batch_x = batch_x*scale
- # 精确到小数点后三位
- batch_x = torch.round(batch_x*qstep_residual)
-
-
- batch_x_numpy = batch_x[0,0,...].numpy() # [H, W]
-
-
- im1 = batch_x_numpy.astype(np.uint32)
-
-
- gzip_bits_1 = gzip_compress(im1)
- bz2_bits_1 = bz2_compress(im1)
- lzma_bits_1 = lzma_compress(im1)
- zlib_bits_1 = zlib_compress(im1)
-
- gzip_bits_avg = gzip_bits_avg + gzip_bits_1
- bz2_bits_avg = bz2_bits_avg + bz2_bits_1
- lzma_bits_avg = lzma_bits_avg + lzma_bits_1
- zlib_bits_avg = zlib_bits_avg + zlib_bits_1
-
-
-
- gzip_bits_avg = gzip_bits_avg/(len(test_loader)*1024*768)
- bz2_bits_avg = bz2_bits_avg /(len(test_loader)*1024*768)
- lzma_bits_avg = lzma_bits_avg /(len(test_loader)*1024*768)
- zlib_bits_avg = zlib_bits_avg/(len(test_loader)*1024*768)
-
-
- print('benchmark gzip_bits_avg: %.4f bz2_bits_avg: %.4f lzma_bits_avg: %.4f zlib_bits_avg: %.4f'
- %( gzip_bits_avg, bz2_bits_avg, lzma_bits_avg, zlib_bits_avg))
- with open(log_path,'w') as f:
- f.writelines('benchmark gzip_bits_avg: %.4f bz2_bits_avg: %.4f lzma_bits_avg: %.4f zlib_bits_avg: %.4f'
- %( gzip_bits_avg, bz2_bits_avg, lzma_bits_avg, zlib_bits_avg))
|