|
- '''
- Evaluation metrics by ZRN
- '''
- import torch
- from scipy import *
- import numpy as np
- import torch.nn.functional as F
- from geomloss import SamplesLoss
- from models.emd.emd import earth_mover_distance
- import matplotlib.pyplot as plt
-
- from im2mesh.utils.libkdtree import KDTree
-
- plt.switch_backend('agg')
-
- class Eval(object):
- def __init__(self):
- self.emd = earth_mover_distance
- '''NDiv_loss, comes from paper Normalized Diversification'''
- def compute_pairwise_distance(self, x):
- ''' computation of pairwise distance matrix
- ---- Input
- - x: input tensor torch.Tensor [(bs), sample_num, dim_x]
- ---- Return
- - matrix: output matrix torch.Tensor [(bs), sample_num, sample_num]
- '''
- if len(x.shape) == 2:
- matrix = torch.norm(x[:, None, :] - x[None, :, :], p=2, dim=2)
- elif len(x.shape) == 3:
- matrix = torch.norm(x[:, :, None, :] - x[:, None, :, :], p=2, dim=3)
- else:
- raise NotImplementedError
- return matrix
- def compute_norm_pairwise_distance(self, x):
- ''' computation of normalized pairwise distance matrix
- ---- Input
- - x: input tensor torch.Tensor [(bs), sample_num, dim_x]
- ---- Return
- - matrix: output matrix torch.Tensor [(bs), sample_num, sample_num]
- '''
- x_pair_dist = self.compute_pairwise_distance(x)
- normalizer = torch.sum(x_pair_dist, dim=-1)
- x_norm_pair_dist = x_pair_dist / (normalizer[..., None] + 1e-12).detach()
- return x_norm_pair_dist
-
- def coverage(self, points_src, points_tgt):
- kdtree = KDTree(points_tgt)
- dist, idx = kdtree.query(points_src)
- uni_idx = np.unique(idx)
- tgt_num = points_tgt.shape[0]
- return uni_idx.shape[0] / tgt_num
-
- def f_score_coverage(self, pointcloud, pointcloud_tgt):
- accuracy = self.coverage(pointcloud, pointcloud_tgt)
- completeness = self.coverage(pointcloud_tgt, pointcloud)
- return 2 * accuracy * completeness / (accuracy + completeness)
-
-
- def NDiv_loss(self, z, y, alpha=0.8):
- ''' NDiv loss function.
- ---- Input
- - z: latent samples after embedding h_Z: torch.Tensor [(bs), sample_num, dim_z].
- - y: corresponding outputs after embedding h_Y: torch.Tensor [(bs), sample_num, dim_y].
- - alpha: hyperparameter alpha in NDiv loss.
- ---- Return
- - loss: normalized diversity loss. torch.Tensor [(bs)]
- '''
- S = z.shape[-2] # sample number
- y_norm_pair_dist = self.compute_norm_pairwise_distance(y)
- z_norm_pair_dist = self.compute_norm_pairwise_distance(z)
- ndiv_loss_matrix = F.relu(z_norm_pair_dist * alpha - y_norm_pair_dist)
- ndiv_loss = ndiv_loss_matrix.sum(-1).sum(-1) / (S * (S - 1))
- return ndiv_loss
-
- '''KL divergence'''
- def KL(self, p_output, q_output, get_softmax=True):
- KL_criterion = torch.nn.KLDivLoss(reduction='batchmean')
- if get_softmax:
- p_output = F.softmax(p_output)
- q_output = F.softmax(q_output)
- loss = KL_criterion(p_output.log(), q_output)
- return loss
-
- '''JS divergence'''
- def JS_divergence(self, p_output, q_output, get_softmax=True):
- """
- Function that measures JS divergence between target and output logits:
- """
- KLDivLoss = torch.nn.KLDivLoss(reduction='batchmean')
- if get_softmax:
- p_output = F.softmax(p_output)
- q_output = F.softmax(q_output)
- log_mean_output = ((p_output + q_output) / 2).log()
- return (KLDivLoss(log_mean_output, p_output) + KLDivLoss(log_mean_output, q_output)) / 2
-
- '''Sinkhorn loss, added entropy to approximate W-distance'''
- def Sinkhorn(self, pc, data):
- #loss = SamplesLoss(loss='sinkhorn', p=2, blur=.05)
- loss = SamplesLoss(loss='sinkhorn')
- L = loss(pc, data)
- return L
-
- def show_assignments(self, a, b, P, path):
- norm_P = P/P.max()
- for i in range(a.shape[0]):
- for j in range(b.shape[0]):
- plt.arrow(a[i,0], a[i,1], a[i,2], b[j,0]-a[i,0], b[j,1]-a[i,1], b[j,2]-a[i,2], alpha=norm_P[i,j].item())
- plt.title('Assignment')
- plt.scatter(a[:,0], a[:,1], a[:,2])
- plt.scatter(b[:,0], b[:,1], b[:,2])
- plt.axis('off')
- plt.savefig(path)
-
- '''check mode and quality of datasets that has GT'''
- def check_mode_3D(self,pc, data, th):
- '''
- input- (250, 3)
- output:
- - valid_mode: (n*n)
- '''
- data = data.detach().cpu().numpy()
- S = np.shape(data)[0] #self.data_points
- valid_mode = np.zeros(S)
- for index in range(S):
- x = data[index][0]
- y = data[index][1]
- z = data[index][2]
- valid_x = np.logical_and((pc[:, 0] > x - th), (pc[:, 0] < x + th))
- valid_y = np.logical_and((pc[:, 1] > y - th), (pc[:, 1] < y + th))
- valid_z = np.logical_and((pc[:, 2] > z - th), (pc[:, 2] < z + th))
- valid = np.logical_and(valid_x, valid_y, valid_z)
- valid_mode[int(index)] = np.any(valid)
- return valid_mode
- def check_quality_3D(self, pc, data, th):
- '''
- input- (250, 3)
- output:
- - count_quality: (1)
- '''
- data = data.detach().cpu().numpy()
- S = np.shape(data)[0]
- N = np.shape(pc)[0]
- res = np.zeros(N)
- for index in range(S):
- x = data[index][0]
- y = data[index][1]
- z = data[index][2]
- valid_x = np.logical_and((pc[:, 0] > x - th), (pc[:, 0] < x + th))
- valid_y = np.logical_and((pc[:, 1] > y - th), (pc[:, 1] < y + th))
- valid_z = np.logical_and((pc[:, 2] > y - th), (pc[:, 2] < z + th))
- valid = np.logical_and(valid_x, valid_y, valid_z)
- # 计算相等的个数用sum
- res = np.logical_or(res, valid)
- count_quality = res.sum()
- return count_quality
- def evaluate_mode(self, numGen, data, th): # for mode coverage
- count_list = []
- P=data[0]
- I_gen = data[1]
- for i in range(10):
- mode = np.zeros(np.shape(P)[0])
- #print('P.shape:', np.shape(P))
- for j in range(100):
- # obtain latent sample
- rand_w = torch.rand([numGen, 1], dtype=torch.float64)
- # print('rand_w:', rand_w)
- output = torch.mul(P[I_gen[0, :], :], rand_w) + torch.mul(P[I_gen[1, :], :], 1 - rand_w)
- output = output.detach().cpu().numpy()
- mode_ = self.check_mode_3D(output, P, th)
- mode = np.logical_or(mode, mode_)
- count = mode.sum()
- count_list.append(count)
- count_list = np.array(count_list)
- num_modes = count_list
- print('th = {0:.2f}, #mode = {1:.2f}, std = {2:.2f}'.format(th, np.mean(num_modes), np.std(num_modes)))
- return np.mean(num_modes)
- def evaluate_quality(self, numGen, data, th):
- P = data[0]
- I_gen = data[1]
- count_list = []
- for i in range(10):
- count = 0
- for j in range(100):
- rand_w = torch.rand([numGen, 1], dtype=torch.float64)
- # print('rand_w:', rand_w)
- output = torch.mul(P[I_gen[0, :], :], rand_w) + torch.mul(P[I_gen[1, :], :], 1 - rand_w)
- output = output.detach().cpu().numpy()
- num = self.check_quality_3D(output, P, th)
- count = count + num
- count_list.append(count)
- count_list = np.array(count_list)
- #print('count_list:', count_list)
- quality = count_list / numGen #numGen:10000, original 100* count_list/25000, 25000= 100*250
- print('th = {0:.2f}, success_rate = {1:.2f}, std = {2:.2f}'.format(th, np.mean(quality), np.std(quality)))
- return np.mean(quality)
-
- '''visualization of latent code in two modes: TSNE and UMAP'''
- '''four or eight classes'''
-
- def Vis_latent_code(self, numOfClasses, input_file_path, final_save_path, labels, flag='train'):
- '''iport related libs'''
- import glob
- from MulticoreTSNE import MulticoreTSNE as TSNE
- import umap
- import matplotlib.pyplot as plt
- plt.switch_backend('agg')
-
- labels = labels.cpu().numpy().tolist() # tensor object has no attribute index
- if flag=='train':
- save_path = final_save_path + str(numOfClasses) + '_train.png'
- elif flag =='test':
- save_path = final_save_path + str(numOfClasses) + '_test.png'
- else:
- save_path = final_save_path + str(numOfClasses) + '_gen.png'
-
- feature = torch.load(input_file_path)
- feature = feature.numpy()
-
- #feature = [x for x in feature if str(x) != 'nan' and str(x) != 'inf'] # 除去inf值
-
- '''reduction dimensions and visualization'''
-
- '''UMAP'''
- embeddings_umap = umap.UMAP().fit_transform(feature)
- umap_x = embeddings_umap[:, 0]
- umap_y = embeddings_umap[:, 1]
-
- '''TSNE'''
- embeddings_tsne = TSNE(n_jobs=4).fit_transform(feature)
- tsne_x = embeddings_tsne[:, 0]
- tsne_y = embeddings_tsne[:, 1]
-
- fig, ax = plt.subplots(1, 2, sharey=True, figsize=(14, 7))
- for j in range(numOfClasses):
- indx = [i for i, x in enumerate(labels) if x == j] # acquire the indexes of the same classes.
- color = plt.cm.Set1(j)
- #print('lables',indx)
- ax[0].scatter(tsne_x[indx], tsne_y[indx], color=color, marker='.', label='TSNE')
- ax[0].set_title('TSNE')
- ax[0].axis('equal')
- ax[1].scatter(umap_x[indx], umap_y[indx], color=color, marker='.', label='UMAP')
- ax[1].set_title('UMAP')
- ax[1].axis('equal')
- if numOfClasses ==4:
- plt.legend(["airplane","cabinet", "chair", "table"])
- else:
- plt.legend(["airplane", "cabinet", "chair", "table", "bench", "car","display","sofa"])
- '''airplane, cabinet, chair, table
- bench, car, display, sofa'''
- plt.xlim((-60, 60)) # pay attention here,different method is different
- plt.ylim((-60, 60))
- plt.savefig(save_path)
- plt.close()
-
- def Vis_latent_code_one(self, numOfClasses, input_file_path, final_save_path, name, flag='train'):
- '''iport related libs'''
- import glob
- from MulticoreTSNE import MulticoreTSNE as TSNE
- import umap
- import matplotlib.pyplot as plt
- plt.switch_backend('agg')
- labels = ['airplane', 'cabinet', 'chair', 'table', 'bench', 'car', 'display', 'sofa']
- c = labels.index(name)
- if flag=='train':
- save_path = final_save_path + str(numOfClasses) + '_' + str(name) +'_train.png'
- elif flag =='test':
- save_path = final_save_path + str(numOfClasses) + '_' + str(name) +'_test.png'
- else:
- save_path = final_save_path + str(numOfClasses) + '_' + str(name) + '_gen.png'
-
- feature = torch.load(input_file_path)
- feature = feature.numpy()
-
- '''reduction dimensions and visualization'''
-
- '''UMAP'''
- embeddings_umap = umap.UMAP().fit_transform(feature)
- umap_x = embeddings_umap[:, 0]
- umap_y = embeddings_umap[:, 1]
-
- '''TSNE'''
- embeddings_tsne = TSNE(n_jobs=4).fit_transform(feature)
- tsne_x = embeddings_tsne[:, 0]
- tsne_y = embeddings_tsne[:, 1]
-
- fig, ax = plt.subplots(1, 2, sharey=True, figsize=(14, 7))
- color = plt.cm.Set1(c)
- ax[0].scatter(tsne_x, tsne_y, color=color, marker='.', label='TSNE')
- ax[0].set_title('TSNE')
- ax[0].axis('equal')
- ax[1].scatter(umap_x, umap_y, color=color, marker='.', label='UMAP')
- ax[1].set_title('UMAP')
- ax[1].axis('equal')
- plt.xlim((-60, 60)) # pay attention here,different method is different
- plt.ylim((-60, 60))
- #plt.title(name)
- plt.savefig(save_path)
- plt.close()
-
- def Vis_latent_code_z(self, numOfClasses, z, final_save_path,name):
- '''iport related libs'''
- import glob
- from MulticoreTSNE import MulticoreTSNE as TSNE
- import umap
- import matplotlib.pyplot as plt
- plt.switch_backend('agg')
- feature = z
- feature = feature.cpu().numpy()
- #print('features:',np.shape(feature))
-
- '''reduction dimensions and visualization'''
-
- '''UMAP'''
- embeddings_umap = umap.UMAP().fit_transform(feature)
- umap_x = embeddings_umap[:, 0]
- umap_y = embeddings_umap[:, 1]
-
- '''TSNE'''
- embeddings_tsne = TSNE(n_jobs=4).fit_transform(feature)
- tsne_x = embeddings_tsne[:, 0]
- tsne_y = embeddings_tsne[:, 1]
-
- fig, ax = plt.subplots(1, 2, sharey=True, figsize=(14, 7))
- #for j in range(numOfClasses):
- color = plt.cm.Set1(0)
- ax[0].scatter(tsne_x, tsne_y, color=color, marker='.', label='TSNE')
- ax[0].set_title('TSNE')
- ax[0].axis('equal')
- ax[1].scatter(umap_x, umap_y, color=color, marker='.', label='UMAP')
- ax[1].set_title('UMAP')
- ax[1].axis('equal')
- plt.xlim((-60, 60)) # pay attention here,different method is different
- plt.ylim((-60, 60))
- save_path = final_save_path + '/'+ str(numOfClasses) + name
- plt.savefig(save_path)
- plt.close()
|