|
- import math
-
- import numpy as np
- import torch
- import torch.nn.functional as F
- from torch import nn
- from torch.autograd import Variable
- from transformers import AutoTokenizer, AutoModel
-
-
- class Config(object):
- """配置参数"""
-
- def __init__(self):
- self.epochs = 10 # epoch数
- self.batch_size = 32 # mini-batch大小
- self.pad_size = 200 # 每句话处理成的长度(短填长切)
- self.learning_rate = 5e-5 # 学习率
- self.emb_size = 768
- self.dropout = 0.25
- self.warm_up_ratio = 0.1
- # capsule参数
- self.num_compressed_capsule = 128 # 紧凑型胶囊的数量
- self.dim_capsule = 16 # 胶囊的尺寸数
- self.Routing = 'Adaptive_KDE_routing' # Adaptive_KDE_routing,dynamic_routing,KDE_routing,my_routing
- # muti_head_attention参数
- self.dim_model = 300
- self.num_head = 6
- # GRU
- self.hidden_size = 150
- self.num_layers = 2
-
-
- class Model(nn.Module):
- def __init__(self, config, embeddings, num_classes, label_embeddings):
- super(Model, self).__init__()
- self.bert = AutoModel.from_pretrained(config.path + 'dataset/bert-base-uncased', output_hidden_states=True)
-
- # GRU
- self.BiGRU = nn.GRU(config.emb_size, config.hidden_size, config.num_layers, bidirectional=True,
- batch_first=True,
- dropout=config.dropout)
-
- # Primary Layer
- self.primary_capsules_doc = PrimaryCaps(num_capsules=config.dim_capsule, in_channels=config.pad_size,
- out_channels=32,
- kernel_size=1, stride=1)
- # 3D-attention
- self.attention = TGMandTRM(16)
- # FlattenCaps
- self.flatten_capsules = FlattenCaps()
- # W_doc初始化
- self.W_doc = nn.Parameter(torch.FloatTensor(19200, config.num_compressed_capsule))
- torch.nn.init.xavier_uniform_(self.W_doc)
- # FCCaps
- self.fc_capsules_doc_child = FCCaps(config, output_capsule_num=num_classes,
- input_capsule_num=config.num_compressed_capsule,
- in_channels=config.dim_capsule, out_channels=config.dim_capsule)
- self.dropout = nn.Dropout(config.dropout)
-
- def compression(self, poses, W):
- poses = torch.matmul(poses.permute(0, 2, 1), W).permute(0, 2, 1)
- activations = torch.sqrt((poses ** 2).sum(2))
- return poses, activations
-
- def forward(self, input_ids, attention_mask, token_type_ids, label_index): # shape[32,500]
- bert_output = self.bert(input_ids=input_ids, token_type_ids=token_type_ids, attention_mask=attention_mask)
- content1 = bert_output['last_hidden_state']
- content1 = self.dropout(content1)
-
- nets_doc, _ = self.BiGRU(content1) # [32,500,300]
-
- poses_doc = self.primary_capsules_doc(
- nets_doc) # poses_doc[32,16,32,300],activations_doc[128,32,446,1]
- poses_doc = self.attention(poses_doc) # [32,16,32,600]
- poses = self.flatten_capsules(poses_doc) # poses[128,14272,16],activations[128,14272,1]
- poses, activations = self.compression(poses, self.W_doc) # poses[128,128,16],activations[128,128]
- poses, activations = self.fc_capsules_doc_child(poses, label_index)
- activations = activations.squeeze(2)
- return activations
-
-
- class PrimaryCaps(nn.Module):
- def __init__(self, num_capsules, in_channels, out_channels, kernel_size, stride):
- super(PrimaryCaps, self).__init__()
-
- self.capsules = nn.Conv1d(in_channels, out_channels * num_capsules, kernel_size, stride)
- torch.nn.init.xavier_uniform_(self.capsules.weight)
-
- self.conv1d1 = nn.Conv1d(512, 1024, 3, padding=1, stride=1)
- self.bn1 = nn.BatchNorm1d(1024)
- self.conv1d2 = nn.Conv1d(1024, 512, 5, padding=2, stride=1)
- self.bn2 = nn.BatchNorm1d(512)
-
- self.leakyrelu = nn.LeakyReLU()
- self.bn = nn.BatchNorm1d(512)
-
- self.out_channels = out_channels
- self.num_capsules = num_capsules
-
- def forward(self, x): # [32,500,300]
- batch_size = x.size(0)
- x = self.leakyrelu(self.bn(self.capsules(x)))
-
- t = self.leakyrelu(self.bn1(self.conv1d1(x)))
- t = self.leakyrelu(self.bn2(self.conv1d2(t)))
-
- # 合并
- u = t + x
- u = self.leakyrelu(u)
-
- u = u.view(batch_size, self.num_capsules, self.out_channels, -1)
- poses = squash_v1(u, axis=1)
- return poses
-
-
- class FlattenCaps(nn.Module):
- def __init__(self):
- super(FlattenCaps, self).__init__()
-
- def forward(self, p):
- poses = p.view(p.size(0), p.size(2) * p.size(3), -1) # [64,14272,16]
- return poses
-
-
- def Adaptive_KDE_routing(batch_size, b_ij, u_hat):
- last_loss = 0.0
- while True:
- if False:
- leak = torch.zeros_like(b_ij).sum(dim=2, keepdim=True)
- leaky_logits = torch.cat((leak, b_ij), 2)
- leaky_routing = F.softmax(leaky_logits, dim=2)
- c_ij = leaky_routing[:, :, 1:, :].unsqueeze(4)
- else:
- c_ij = F.softmax(b_ij, dim=2).unsqueeze(4)
- c_ij = c_ij / c_ij.sum(dim=1, keepdim=True)
- v_j = squash_v1((c_ij * u_hat).sum(dim=1, keepdim=True), axis=3)
- dd = 1 - ((squash_v1(u_hat, axis=3) - v_j) ** 2).sum(3)
- b_ij = b_ij + dd
-
- c_ij = c_ij.view(batch_size, c_ij.size(1), c_ij.size(2))
- dd = dd.view(batch_size, dd.size(1), dd.size(2))
-
- kde_loss = torch.mul(c_ij, dd).sum() / batch_size
- kde_loss = np.log(kde_loss.item())
-
- if abs(kde_loss - last_loss) < 0.05:
- break
- else:
- last_loss = kde_loss
- poses = v_j.squeeze(1)
- activations = torch.sqrt((poses ** 2).sum(2))
- return poses, activations
-
-
- def squash_v1(x, axis):
- s_squared_norm = (x ** 2).sum(axis, keepdim=True) # 按行相加,并且保持其二维特性[64,1,32,446,1]
- scale = torch.sqrt(s_squared_norm) / (0.5 + s_squared_norm) # [64,1,32,446,1]
- return scale * x
-
-
- class FCCaps(nn.Module):
- def __init__(self, args, output_capsule_num, input_capsule_num, in_channels, out_channels):
- super(FCCaps, self).__init__()
-
- self.in_channels = in_channels
- self.out_channels = out_channels
- self.input_capsule_num = input_capsule_num
- self.output_capsule_num = output_capsule_num
-
- self.W1 = nn.Parameter(torch.FloatTensor(1, input_capsule_num, output_capsule_num, out_channels,
- in_channels)) # [1,128,3954,16,16]
- torch.nn.init.xavier_uniform_(self.W1)
-
- self.Routing = args.Routing
- self.sigmoid = nn.Sigmoid()
-
- def forward(self, x, label_index):
- batch_size = x.size(0)
- variable_output_capsule_num = len(label_index) # 10
- W1 = self.W1[:, :, label_index, :, :] # [1,128,276,16,16]
-
- x = torch.stack([x] * variable_output_capsule_num, dim=2).unsqueeze(4) # [64,128,276,16,1]
-
- W1 = W1.repeat(batch_size, 1, 1, 1, 1) # [64,128,276,16,16]
- u_hat = torch.matmul(W1, x)
-
- b_ij = Variable(torch.zeros(batch_size, self.input_capsule_num, variable_output_capsule_num, 1)).to(x.device)
-
- if self.Routing == 'Adaptive_KDE_routing':
- poses, activations = Adaptive_KDE_routing(batch_size, b_ij, u_hat)
- return poses, activations
-
-
- class TGMandTRM(nn.Module):
- def __init__(self, h):
- super(TGMandTRM, self).__init__()
- self.rank = 80
- self.ps = [1, 1, 1, 1]
- self.h = h
- conv1_1, conv1_2, conv1_3 = self.ConvGeneration(self.rank)
-
- self.conv1_1 = conv1_1
- self.conv1_2 = conv1_2
- self.conv1_3 = conv1_3
-
- self.leakyrelu = nn.LeakyReLU()
-
- for m in self.modules():
- if isinstance(m, nn.Conv2d):
- nn.init.kaiming_normal_(m.weight.data, mode='fan_out')
-
- self.w1 = nn.Parameter(torch.FloatTensor(1, 1, 32, 300))
- torch.nn.init.xavier_uniform_(self.w1)
- self.w2 = nn.Parameter(torch.FloatTensor(1, 1, 16, 300))
- torch.nn.init.xavier_uniform_(self.w2)
- self.w3 = nn.Parameter(torch.FloatTensor(1, 1, 32, 16))
- torch.nn.init.xavier_uniform_(self.w3)
-
- def forward(self, x):
- b, c, height, width = x.size()
- C = torch.einsum('bchw,dehw->bcde', x, self.w1)
- W = torch.einsum('bchw,decw->bhde', x, self.w2)
- H = torch.einsum('bchw,dehc->bwde', x, self.w3)
- # C = self.pool(x)
- # H = self.pool(x.permute(0, 3, 1, 2).contiguous())
- # W = self.pool(x.permute(0, 2, 3, 1).contiguous())
-
- lam = torch.ones(self.rank, requires_grad=True).to(x.device)
- lam = F.softmax(lam, -1)
- lam = torch.chunk(lam, dim=0, chunks=self.rank)
- list = []
- for i in range(0, self.rank):
- t = self.TukerReconstruction(b, self.h, self.ps[0], self.conv1_1[i](C), self.conv1_2[i](H),
- self.conv1_3[i](W))
- list.append(lam[i] * t)
- tensor1 = sum(list)
- y = self.leakyrelu(x * tensor1)
- tensor1 = torch.cat((x, y), 3)
- return tensor1
-
- def ConvGeneration(self, rank):
- conv1 = []
- n = 1
- for _ in range(0, rank):
- conv1.append(nn.Sequential(
- nn.Conv2d(16, 16 // n, kernel_size=1, bias=False),
- nn.LeakyReLU(),
- ))
- conv1 = nn.ModuleList(conv1)
-
- conv2 = []
- for _ in range(0, rank):
- conv2.append(nn.Sequential(
- nn.Conv2d(300, 300, kernel_size=1, bias=False),
- nn.LeakyReLU(),
- ))
- conv2 = nn.ModuleList(conv2)
-
- conv3 = []
- for _ in range(0, rank):
- conv3.append(nn.Sequential(
- nn.Conv2d(32, 32, kernel_size=1, bias=False),
- nn.LeakyReLU(),
- ))
- conv3 = nn.ModuleList(conv3)
-
- return conv1, conv2, conv3
-
- def TukerReconstruction(self, batch_size, h, ps, feat, feat2, feat3):
- b = batch_size
- C = feat.view(b, -1, ps)
- H = feat2.view(b, ps, -1)
- W = feat3.view(b, ps * ps, -1)
- # CH_ = torch.bmm(C, H)
- # CH = CH_.view(b, -1, ps * ps)
- # CHW_ = torch.bmm(CH, W)
- # CHW = CHW_.view(b, h, 2*h, -1)
- CHW = torch.bmm(torch.bmm(C, H).view(b, -1, ps * ps), W).view(b, h, 2 * h, -1)
- return CHW
|