|
- # -*- coding = utf-8 -*-
- '''
- # @time:2023/3/3 15:53
- # Author:DFTL
- # @File:visualizesion.py
- '''
- # import torch
- # from torch.utils.tensorboard import SummaryWriter
- # import torchvision
- #
- # writer_img = SummaryWriter()
- # # writer_text = SummaryWriter()
- # # writer_mix = SummaryWriter()
- #
- # image_feature = torch.randn([1000,64])
- # label = torch.randint(0,13,[1000])
- # # text_feature = torch.randn([1000,64])
- # # mix_feature = torch.cat((image_feature,text_feature),dim=0)
- # # mix_label = torch.cat((label,label),dim=0)
- # writer_img.add_embedding(mat=image_feature,metadata=label)
- # # writer_text.add_embedding(mat=text_feature,metadata=label)
- # # writer_mix.add_embedding(mat=mix_feature,metadata=mix_label)
- #
- #
- #
- # writer_img.close()
- # # writer_text.close()
- # # writer_mix.close()
-
- import os
- from tqdm import tqdm
- import json
- import argparse
- import numpy as np
- import matplotlib.pyplot as plt
- from sklearn import datasets
- from sklearn.manifold import TSNE
- import torch
- from utils.Dataset_df import MyDataset_v2,MyDataset
- from torch.utils.data import DataLoader
- from torch.nn import CrossEntropyLoss
- from transformers import BertTokenizer,BertModel
-
- import Module
-
-
- device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
-
- # class MyDataset_v2(Dataset):
- # def __init__(self,image_path,label_path,triples_path):
- # self.image_data = np.load(image_path)
- # self.label_data = np.load(label_path)
- # with open(triples_path, 'r', encoding='utf-8') as f:
- # self.knowledge_dict = json.load(f)
-
- # def __getitem__(self, item):
- # image = self.image_data[item, :, :, :]
- # image = np.transpose(image,(2,0,1))
-
- # label = self.label_data[item]
-
- # triples = self.knowledge_dict[str(label.astype(int))]
- # event = triples[0][0] + triples[0][1] + triples[0][2]
-
- # return torch.tensor(image).float(), torch.tensor(label), event
-
- # def __len__(self):
- # return self.image_data.shape[0]
-
- # 加载数据
- def get_data():
- """
- :return: 数据集、标签、样本数量、特征数量
- """
- digits = datasets.load_digits(n_class=10)
- data = digits.data # 图片特征
- label = digits.target # 图片标签
- n_samples, n_features = data.shape # 数据集的形状
- return data, label, n_samples, n_features
-
- # 对样本进行预处理并画图
- def plot_embedding(data, label, title):
- """
- :param data:数据集
- :param label:样本标签
- :param title:图像标题
- :return:图像
- """
- x_min, x_max = np.min(data, 0), np.max(data, 0)
- data = (data - x_min) / (x_max - x_min) # 对数据进行归一化处理
- fig = plt.figure() # 创建图形实例
- ax = plt.subplot(111) # 创建子图
- # 遍历所有样本
- for i in range(data.shape[0]):
- # 在图中为每个数据点画出标签
- # c = plt.cm.Set1(label[i])
- plt.text(data[i, 0], data[i, 1], str(label[i]), style = "italic",color=plt.cm.Set1(label[i]/12),
- fontdict={ "weight" : "bold" , "size" : 10})
- plt.xticks() # 指定坐标的刻度
- plt.yticks()
- plt.title(title, fontsize=14)
- # 返回值
- return fig
-
- # 主函数,执行t-SNE降维
- def main(data,label,save_path):
- # data, label , n_samples, n_features = get_data() # 调用函数,获取数据集信息
-
- print( "Starting compute t-SNE Embedding..." )
- ts = TSNE(n_components=2, init= "pca" , random_state=0)
- # t-SNE降维
- reslut = ts.fit_transform(data)
- # 调用函数,绘制图像
- fig = plot_embedding(reslut, label, "t-SNE Embedding of digits" )
- plt.savefig(save_path)
- # 显示图像
- plt.show()
-
- def getKnowledge(knowledge_dict,labels): #根据标签取出三条三元组
-
- kd1 = []
- kd2 = []
- kd3 = []
-
- for label in labels:
-
- triples = knowledge_dict[str(label.int().item())]#根据标签取出相关三元组
-
- kd1.append(triples[0][0] + triples[0][1] + triples[0][2])
- kd2.append(triples[1][0] + triples[1][1] + triples[1][2])
- kd3.append(triples[2][0] + triples[2][1] + triples[2][2])
-
- return kd1,kd2,kd3
-
- def getTokenizer(bert_base_chinese_path):
-
- tokenizer = BertTokenizer.from_pretrained(bert_base_chinese_path)
- bert = BertModel.from_pretrained(bert_base_chinese_path)
- # encoded_text = get_encoded_text(text,max_len,tokenizer,bert)
- return tokenizer,bert.to(device)
-
- def getEncodedtext(triples,max_len,tokenizer,bert):
-
- # tokenizer = BertTokenizer.from_pretrained('bert-base-chinese')
-
- tokenized = tokenizer(triples, max_length=max_len, truncation=True)
- tokens = tokenized['input_ids']
- # masks = tokenized['attention_mask']
- # text_len = len(tokens)
-
- token_ids = torch.tensor([tokens],dtype=torch.long).to(device)
- # masks = torch.tensor(masks, dtype=torch.bool)
-
- encoded_text = bert(token_ids)[1]
-
- return encoded_text #, sub_heads, sub_tails, sub_head, sub_tail, obj_heads, obj_tails # , json_data['spo_list']
-
- def test(model,train_dataset,args):
-
- model.eval()
-
- tokenizer, bert = getTokenizer(args.pretrained_bert)
-
- loss_ = CrossEntropyLoss()
-
- with open(args.knowledge_path,'r',encoding='utf-8') as f:
- knowledge_dict = json.load(f)
-
- data_ = torch.zeros([1,64])
- label_ = torch.zeros([1])
-
- '''钩子截流'''
- features = []
- feature_event = []
-
- def hook_image(net, input, output):
- features.append(output)
- return None
-
- def hook_event(net, input, output):
- feature_event.append(output)
- return None
-
- model.semantic_token.register_forward_hook(hook_image)
- model.event_embeding.register_forward_hook(hook_event)
-
- i = 0
- with torch.no_grad():
-
- for data in tqdm(train_dataset): #进度条tqdm
-
- i+=1
-
- #影像数据
- image, label = data
- image = image.to(device)
- label = label.to(device)
-
- t1,t2,t3 = getKnowledge(knowledge_dict,label)
-
- '''预训练的BERT对三元组文本编码'''
- #三元组数据,数量:3
-
- encodedevent1 = torch.cat([getEncodedtext(t,args.max_len,tokenizer,bert) for t in t1]).to(device)
- # encodedevent2 = torch.cat([getEncodedtext(t,args.max_len,tokenizer,bert) for t in t2]).to(device)
- # encodedevent3 = torch.cat([getEncodedtext(t,args.max_len,tokenizer,bert) for t in t3]).to(device)
-
-
-
- # losses = pretrained_model(loss_,label,image,encodedevent1,encodedevent2,encodedevent3)
- losses = model(loss_, label, image, encodedevent1)
-
- data_ = torch.cat([data_,features[0]],dim=0)
- label_ = torch.cat([label_,label],dim=0)
-
- features.clear()
- feature_event.clear()
-
- return data_[1:data_.shape[0],:],label_[1:data_.shape[0]]
-
- def test_v2(model,train_dataset,args):#加入相同映射空间
-
- model.eval()
-
- tokenizer, bert = getTokenizer(args.pretrained_bert)
-
- loss_ = CrossEntropyLoss()
-
- with open(args.knowledge_path,'r',encoding='utf-8') as f:
- knowledge_dict = json.load(f)
-
- data_ = torch.zeros([1,64])
- data__ = torch.zeros([1, 64])
- label_ = torch.zeros([1])
-
- '''钩子截流'''
- features = []
-
- # feature_event = []
- def hook_image(net, input, output):
- features.append(output)
- return None
-
- model.linear.register_forward_hook(hook_image)
-
- i = 0
- with torch.no_grad():
-
- for data in tqdm(train_dataset): #进度条tqdm
-
- i+=1
-
- #影像数据
- image, label = data
- image = image.to(device)
- label = label.to(device)
-
- t1,t2,t3 = getKnowledge(knowledge_dict,label)
-
- '''预训练的BERT对三元组文本编码'''
- #三元组数据,数量:3
-
- encodedevent1 = torch.cat([getEncodedtext(t,args.max_len,tokenizer,bert) for t in t1]).to(device)
- # encodedevent2 = torch.cat([getEncodedtext(t,args.max_len,tokenizer,bert) for t in t2]).to(device)
- # encodedevent3 = torch.cat([getEncodedtext(t,args.max_len,tokenizer,bert) for t in t3]).to(device)
-
- # model.event_embeding.register_forward_hook(hook_event)
-
- # losses = pretrained_model(loss_,label,image,encodedevent1,encodedevent2,encodedevent3)
- losses = model(loss_, label, image, encodedevent1)
-
- data_ = torch.cat([data_,features[0]],dim=0)#image/text
- data__ = torch.cat([data__,features[1]],dim=0)
- label_ = torch.cat([label_,label],dim=0)
-
- features.clear()
-
- return data_[1:data_.shape[0],:],data__[1:data_.shape[0],:],label_[1:data_.shape[0]]
-
- def test_0312(model,train_dataset,tokenizer,args):
- model.eval()
-
- data_ = torch.zeros([1,64])
- data__ = torch.zeros([1, 64])
- label_ = torch.zeros([1])
-
- '''钩子截流'''
- #features = []
- # feature_event = []
- #def hook_image(net, input, output):
- #features.append(output)
- #return None
-
- #def hook_event(net,input,output):
- #features.append(output)
- #return None
-
- # model.event_embeding.event_proj.register_forward_hook(hook_image)
- # model.event_embeding.event_proj.register_forward_hook(hook_image)
-
- for data in tqdm(train_dataset):
-
- image,label,event = data
- image = image.to(device)
- label = label.to(device)
-
- event_input1 = tokenizer(event, padding='longest', truncation=True, max_length=25, return_tensors="pt").to(device)
-
- i ,e = model(image, event_input1,label)
-
- data_ = torch.cat([data_, i.detach().cpu()], dim=0) # image/text
- data__ = torch.cat([data__, e.detach().cpu()], dim=0)
- label_ = torch.cat([label_, label.detach().cpu()], dim=0)
-
- # features.clear()
-
- return data_[1:data_.shape[0],:],data__[1:data_.shape[0],:],label_[1:data_.shape[0]]
-
- # 主函数
- if __name__ == "__main__" :
-
- parser = argparse.ArgumentParser(description='Model Controller')
-
- parser.add_argument('--mode',type=str,default='pre_train',help='pre_train/test/train')
-
- #parser.add_argument('--modelname', type=str, default=r"/model/weights", help='the path saving weights')
-
- parser.add_argument('--modelname', type=str, default="", help='the path saving weights')
- parser.add_argument('--image_path',type=str,default="/dataset/X_train.npy")
- parser.add_argument('--label_path', type=str, default="/dataset/Y_train.npy")
- parser.add_argument('--knowledge_path',type=str, default="/code/DataSet/Knowledge_v2.json")
- parser.add_argument('--config_path', type=str, default="/code/configs/config_bert.json")
- parser.add_argument('--visual_path', type=str, default="/result/0323")
-
- parser.add_argument('--pretrained_bert',type=str,default='/dataset/bert-base-chinese')
- parser.add_argument('--embed_dim',type=int,default=64)
-
- parser.add_argument('--batch', type=int, default=128)
-
-
- args = parser.parse_args()
-
- '''可视化对齐数据'''
-
- # mydataset = Dataset_df.MyDataset(args.image_path,args.label_path)
- mydataset = MyDataset_v2(args.image_path, args.label_path, args.knowledge_path)
-
- data_loader = DataLoader(dataset=mydataset,batch_size=args.batch, shuffle=False, pin_memory=True)
-
- model_path = r"/model/weights"
-
- model_list = os.listdir(r"/model/weights")
- #print(args.weights_path)
- print(len(model_list))
-
- for pretrain in model_list:
-
- print("========={}==========".format(pretrain))
-
- # model = Module.MyModuel_v4().to(device)
- model = Module.MyModule_v8(args).to(device)
-
- model.load_state_dict(torch.load(os.path.join(model_path,pretrain),map_location=device))
-
- tokenizer = BertTokenizer.from_pretrained(args.pretrained_bert)
-
- # data1,data2,label = test_v2(model,data_loader,args)
- data1, data2, label = test_0312(model, data_loader, tokenizer,args)
- # data1,data2,label = test(model,data_loader,args)
-
- name = pretrain.split('.p')[0]
-
- if not os.path.exists(os.path.join(args.visual_path,name)):
- os.makedirs(os.path.join(args.visual_path,name))
-
- torch.save(data1,os.path.join(os.path.join(args.visual_path,name),name+'_image.pt'))
- torch.save(data2,os.path.join(os.path.join(args.visual_path,name),name+'_event.pt'))
- torch.save(label,os.path.join(os.path.join(args.visual_path,name),name+'_label.pt'))
-
- # data = torch.load(os.path.join(args.feature_path,'epoch_48_text_semantic.pt'))
- # label = torch.load(os.path.join(args.feature_path,'epoch_48_label.pt'))
-
- # for i in range(100):
- # print(data[i],label[i])
-
- '''调用可视化函数'''
- #main(data2.numpy(),label,os.path.join(os.path.join(args.visual_path,name),name+'_event.png'))
- #main(data1.numpy(),label,os.path.join(os.path.join(args.visual_path,name),name+'_image.png'))
|