|
- """
- @Author : Lee, Qin
- @StartTime : 2018/08/13
- @Filename : module.py
- @Software : Pycharm
- @Framework : Pytorch
- @LastModify : 2019/05/07
- """
-
- import math
-
- import torch
- import torch.nn as nn
- import torch.nn.functional as F
- from torch.nn.utils.rnn import pack_padded_sequence
- from torch.nn.utils.rnn import pad_packed_sequence
-
-
- class ModelManager(nn.Module):
-
- def __init__(self, args, num_word, num_slot, num_intent):
- super(ModelManager, self).__init__()
-
- self.__num_word = num_word
- self.__num_slot = num_slot
- self.__num_intent = num_intent
- self.__args = args
-
- # Initialize an embedding object.
- self.__embedding = EmbeddingCollection(
- self.__num_word,
- self.__args.word_embedding_dim
- )
-
- # Initialize an LSTM Encoder object.
- self.__encoder = LSTMEncoder(
- self.__args.word_embedding_dim,
- self.__args.encoder_hidden_dim,
- self.__args.dropout_rate
- )
-
- # Initialize an self-attention layer.
- self.__attention = SelfAttention(
- self.__args.word_embedding_dim,
- self.__args.attention_hidden_dim,
- self.__args.attention_output_dim,
- self.__args.dropout_rate
- )
-
- # Initialize an Decoder object for intent.
- self.__intent_decoder = LSTMDecoder(
- self.__args.encoder_hidden_dim + self.__args.attention_output_dim,
- self.__args.intent_decoder_hidden_dim,
- self.__num_intent, self.__args.dropout_rate,
- embedding_dim=self.__args.intent_embedding_dim
- )
- # Initialize an Decoder object for slot.
- self.__slot_decoder = LSTMDecoder(
- self.__args.encoder_hidden_dim + self.__args.attention_output_dim,
- self.__args.slot_decoder_hidden_dim,
- self.__num_slot, self.__args.dropout_rate,
- embedding_dim=self.__args.slot_embedding_dim,
- extra_dim=self.__num_intent
- )
-
- # One-hot encoding for augment data feed.
- self.__intent_embedding = nn.Embedding(
- self.__num_intent, self.__num_intent
- )
- self.__intent_embedding.weight.data = torch.eye(self.__num_intent)
- self.__intent_embedding.weight.requires_grad = False
-
- def show_summary(self):
- """
- print the abstract of the defined model.
- """
-
- print('Model parameters are listed as follows:\n')
-
- print('\tnumber of word: {};'.format(self.__num_word))
- print('\tnumber of slot: {};'.format(self.__num_slot))
- print('\tnumber of intent: {};'.format(self.__num_intent))
- print('\tword embedding dimension: {};'.format(self.__args.word_embedding_dim))
- print('\tencoder hidden dimension: {};'.format(self.__args.encoder_hidden_dim))
- print('\tdimension of intent embedding: {};'.format(self.__args.intent_embedding_dim))
- print('\tdimension of slot embedding: {};'.format(self.__args.slot_embedding_dim))
- print('\tdimension of slot decoder hidden: {};'.format(self.__args.slot_decoder_hidden_dim))
- print('\tdimension of intent decoder hidden: {};'.format(self.__args.intent_decoder_hidden_dim))
- print('\thidden dimension of self-attention: {};'.format(self.__args.attention_hidden_dim))
- print('\toutput dimension of self-attention: {};'.format(self.__args.attention_output_dim))
-
- print('\nEnd of parameters show. Now training begins.\n\n')
-
- def forward(self, text, seq_lens, n_predicts=None, forced_slot=None, forced_intent=None):
- word_tensor, _ = self.__embedding(text)
-
- lstm_hiddens = self.__encoder(word_tensor, seq_lens)
- # transformer_hiddens = self.__transformer(pos_tensor, seq_lens)
- attention_hiddens = self.__attention(word_tensor, seq_lens)
- hiddens = torch.cat([attention_hiddens, lstm_hiddens], dim=1)
-
- pred_intent = self.__intent_decoder(
- hiddens, seq_lens,
- forced_input=forced_intent
- )
-
- if not self.__args.differentiable:
- _, idx_intent = pred_intent.topk(1, dim=-1)
- feed_intent = self.__intent_embedding(idx_intent.squeeze(1))
- else:
- feed_intent = pred_intent
-
- pred_slot = self.__slot_decoder(
- hiddens, seq_lens,
- forced_input=forced_slot,
- extra_input=feed_intent
- )
-
- if n_predicts is None:
- return F.log_softmax(pred_slot, dim=1), F.log_softmax(pred_intent, dim=1)
- else:
- _, slot_index = pred_slot.topk(n_predicts, dim=1)
- _, intent_index = pred_intent.topk(n_predicts, dim=1)
-
- return slot_index.cpu().data.numpy().tolist(), intent_index.cpu().data.numpy().tolist()
-
- def golden_intent_predict_slot(self, text, seq_lens, golden_intent, n_predicts=1):
- word_tensor, _ = self.__embedding(text)
- embed_intent = self.__intent_embedding(golden_intent)
-
- lstm_hiddens = self.__encoder(word_tensor, seq_lens)
- attention_hiddens = self.__attention(word_tensor, seq_lens)
- hiddens = torch.cat([attention_hiddens, lstm_hiddens], dim=1)
-
- pred_slot = self.__slot_decoder(
- hiddens, seq_lens, extra_input=embed_intent
- )
- _, slot_index = pred_slot.topk(n_predicts, dim=-1)
-
- # Just predict single slot value.
- return slot_index.cpu().data.numpy().tolist()
-
-
- class EmbeddingCollection(nn.Module):
- """
- Provide word vector and position vector encoding.
- """
-
- def __init__(self, input_dim, embedding_dim, max_len=5000):
- super(EmbeddingCollection, self).__init__()
-
- self.__input_dim = input_dim
- # Here embedding_dim must be an even embedding.
- self.__embedding_dim = embedding_dim
- self.__max_len = max_len
-
- # Word vector encoder.
- self.__embedding_layer = nn.Embedding(
- self.__input_dim, self.__embedding_dim
- )
-
- # Position vector encoder.
- # self.__position_layer = torch.zeros(self.__max_len, self.__embedding_dim)
- # position = torch.arange(0, self.__max_len).unsqueeze(1)
- # div_term = torch.exp(torch.arange(0, self.__embedding_dim, 2) *
- # (-math.log(10000.0) / self.__embedding_dim))
-
- # Sine wave curve design.
- # self.__position_layer[:, 0::2] = torch.sin(position * div_term)
- # self.__position_layer[:, 1::2] = torch.cos(position * div_term)
- #
- # self.__position_layer = self.__position_layer.unsqueeze(0)
- # self.register_buffer('pe', self.__position_layer)
-
- def forward(self, input_x):
- # Get word vector encoding.
- embedding_x = self.__embedding_layer(input_x)
-
- # Get position encoding.
- # position_x = Variable(self.pe[:, :input_x.size(1)], requires_grad=False)
-
- # Board-casting principle.
- return embedding_x, embedding_x
-
-
- class LSTMEncoder(nn.Module):
- """
- Encoder structure based on bidirectional LSTM.
- """
-
- def __init__(self, embedding_dim, hidden_dim, dropout_rate):
- super(LSTMEncoder, self).__init__()
-
- # Parameter recording.
- self.__embedding_dim = embedding_dim
- self.__hidden_dim = hidden_dim // 2
- self.__dropout_rate = dropout_rate
-
- # Network attributes.
- self.__dropout_layer = nn.Dropout(self.__dropout_rate)
- self.__lstm_layer = nn.LSTM(
- input_size=self.__embedding_dim,
- hidden_size=self.__hidden_dim,
- batch_first=True,
- bidirectional=True,
- dropout=self.__dropout_rate,
- num_layers=1
- )
-
- def forward(self, embedded_text, seq_lens):
- """ Forward process for LSTM Encoder.
-
- (batch_size, max_sent_len)
- -> (batch_size, max_sent_len, word_dim)
- -> (batch_size, max_sent_len, hidden_dim)
- -> (total_word_num, hidden_dim)
-
- :param embedded_text: padded and embedded input text.
- :param seq_lens: is the length of original input text.
- :return: is encoded word hidden vectors.
- """
-
- # Padded_text should be instance of LongTensor.
- dropout_text = self.__dropout_layer(embedded_text)
-
- # Pack and Pad process for input of variable length.
- packed_text = pack_padded_sequence(dropout_text, seq_lens, batch_first=True)
- lstm_hiddens, (h_last, c_last) = self.__lstm_layer(packed_text)
- padded_hiddens, _ = pad_packed_sequence(lstm_hiddens, batch_first=True)
-
- return torch.cat([padded_hiddens[i][:seq_lens[i], :] for i in range(0, len(seq_lens))], dim=0)
-
-
- class LSTMDecoder(nn.Module):
- """
- Decoder structure based on unidirectional LSTM.
- """
-
- def __init__(self, input_dim, hidden_dim, output_dim, dropout_rate, embedding_dim=None, extra_dim=None):
- """ Construction function for Decoder.
-
- :param input_dim: input dimension of Decoder. In fact, it's encoder hidden size.
- :param hidden_dim: hidden dimension of iterative LSTM.
- :param output_dim: output dimension of Decoder. In fact, it's total number of intent or slot.
- :param dropout_rate: dropout rate of network which is only useful for embedding.
- :param embedding_dim: if it's not None, the input and output are relevant.
- :param extra_dim: if it's not None, the decoder receives information tensors.
- """
-
- super(LSTMDecoder, self).__init__()
-
- self.__input_dim = input_dim
- self.__hidden_dim = hidden_dim
- self.__output_dim = output_dim
- self.__dropout_rate = dropout_rate
- self.__embedding_dim = embedding_dim
- self.__extra_dim = extra_dim
-
- # If embedding_dim is not None, the output and input
- # of this structure is relevant.
- if self.__embedding_dim is not None:
- self.__embedding_layer = nn.Embedding(output_dim, embedding_dim)
- self.__init_tensor = nn.Parameter(
- torch.randn(1, self.__embedding_dim),
- requires_grad=True
- )
-
- # Make sure the input dimension of iterative LSTM.
- if self.__extra_dim is not None and self.__embedding_dim is not None:
- lstm_input_dim = self.__input_dim + self.__extra_dim + self.__embedding_dim
- elif self.__extra_dim is not None:
- lstm_input_dim = self.__input_dim + self.__extra_dim
- elif self.__embedding_dim is not None:
- lstm_input_dim = self.__input_dim + self.__embedding_dim
- else:
- lstm_input_dim = self.__input_dim
-
- # Network parameter definition.
- self.__dropout_layer = nn.Dropout(self.__dropout_rate)
- self.__lstm_layer = nn.LSTM(
- input_size=lstm_input_dim,
- hidden_size=self.__hidden_dim,
- batch_first=True,
- bidirectional=False,
- dropout=self.__dropout_rate,
- num_layers=1
- )
- self.__linear_layer = nn.Linear(
- self.__hidden_dim,
- self.__output_dim
- )
-
- def forward(self, encoded_hiddens, seq_lens, forced_input=None, extra_input=None):
- """ Forward process for decoder.
-
- :param encoded_hiddens: is encoded hidden tensors produced by encoder.
- :param seq_lens: is a list containing lengths of sentence.
- :param forced_input: is truth values of label, provided by teacher forcing.
- :param extra_input: comes from another decoder as information tensor.
- :return: is distribution of prediction labels.
- """
-
- # Concatenate information tensor if possible.
- if extra_input is not None:
- input_tensor = torch.cat([encoded_hiddens, extra_input], dim=1)
- else:
- input_tensor = encoded_hiddens
-
- output_tensor_list, sent_start_pos = [], 0
- if self.__embedding_dim is None or forced_input is not None:
-
- for sent_i in range(0, len(seq_lens)):
- sent_end_pos = sent_start_pos + seq_lens[sent_i]
-
- # Segment input hidden tensors.
- seg_hiddens = input_tensor[sent_start_pos: sent_end_pos, :]
-
- if self.__embedding_dim is not None and forced_input is not None:
- if seq_lens[sent_i] > 1:
- seg_forced_input = forced_input[sent_start_pos: sent_end_pos]
- seg_forced_tensor = self.__embedding_layer(seg_forced_input).view(seq_lens[sent_i], -1)
- seg_prev_tensor = torch.cat([self.__init_tensor, seg_forced_tensor[:-1, :]], dim=0)
- else:
- seg_prev_tensor = self.__init_tensor
-
- # Concatenate forced target tensor.
- combined_input = torch.cat([seg_hiddens, seg_prev_tensor], dim=1)
- else:
- combined_input = seg_hiddens
- dropout_input = self.__dropout_layer(combined_input)
-
- lstm_out, _ = self.__lstm_layer(dropout_input.view(1, seq_lens[sent_i], -1))
- linear_out = self.__linear_layer(lstm_out.view(seq_lens[sent_i], -1))
-
- output_tensor_list.append(linear_out)
- sent_start_pos = sent_end_pos
- else:
- for sent_i in range(0, len(seq_lens)):
- prev_tensor = self.__init_tensor
-
- # It's necessary to remember h and c state
- # when output prediction every single step.
- last_h, last_c = None, None
-
- sent_end_pos = sent_start_pos + seq_lens[sent_i]
- for word_i in range(sent_start_pos, sent_end_pos):
- seg_input = input_tensor[[word_i], :]
- combined_input = torch.cat([seg_input, prev_tensor], dim=1)
- dropout_input = self.__dropout_layer(combined_input).view(1, 1, -1)
-
- if last_h is None and last_c is None:
- lstm_out, (last_h, last_c) = self.__lstm_layer(dropout_input)
- else:
- lstm_out, (last_h, last_c) = self.__lstm_layer(dropout_input, (last_h, last_c))
-
- lstm_out = self.__linear_layer(lstm_out.view(1, -1))
- output_tensor_list.append(lstm_out)
-
- _, index = lstm_out.topk(1, dim=1)
- prev_tensor = self.__embedding_layer(index).view(1, -1)
- sent_start_pos = sent_end_pos
-
- return torch.cat(output_tensor_list, dim=0)
-
-
- class QKVAttention(nn.Module):
- """
- Attention mechanism based on Query-Key-Value architecture. And
- especially, when query == key == value, it's self-attention.
- """
-
- def __init__(self, query_dim, key_dim, value_dim, hidden_dim, output_dim, dropout_rate):
- super(QKVAttention, self).__init__()
-
- # Record hyper-parameters.
- self.__query_dim = query_dim
- self.__key_dim = key_dim
- self.__value_dim = value_dim
- self.__hidden_dim = hidden_dim
- self.__output_dim = output_dim
- self.__dropout_rate = dropout_rate
-
- # Declare network structures.
- self.__query_layer = nn.Linear(self.__query_dim, self.__hidden_dim)
- self.__key_layer = nn.Linear(self.__key_dim, self.__hidden_dim)
- self.__value_layer = nn.Linear(self.__value_dim, self.__output_dim)
- self.__dropout_layer = nn.Dropout(p=self.__dropout_rate)
-
- def forward(self, input_query, input_key, input_value):
- """ The forward propagation of attention.
-
- Here we require the first dimension of input key
- and value are equal.
-
- :param input_query: is query tensor, (n, d_q)
- :param input_key: is key tensor, (m, d_k)
- :param input_value: is value tensor, (m, d_v)
- :return: attention based tensor, (n, d_h)
- """
-
- # Linear transform to fine-tune dimension.
- linear_query = self.__query_layer(input_query)
- linear_key = self.__key_layer(input_key)
- linear_value = self.__value_layer(input_value)
-
- score_tensor = F.softmax(torch.matmul(
- linear_query,
- linear_key.transpose(-2, -1)
- ) / math.sqrt(self.__hidden_dim), dim=-1)
- forced_tensor = torch.matmul(score_tensor, linear_value)
- forced_tensor = self.__dropout_layer(forced_tensor)
-
- return forced_tensor
-
-
- class SelfAttention(nn.Module):
-
- def __init__(self, input_dim, hidden_dim, output_dim, dropout_rate):
- super(SelfAttention, self).__init__()
-
- # Record parameters.
- self.__input_dim = input_dim
- self.__hidden_dim = hidden_dim
- self.__output_dim = output_dim
- self.__dropout_rate = dropout_rate
-
- # Record network parameters.
- self.__dropout_layer = nn.Dropout(self.__dropout_rate)
- self.__attention_layer = QKVAttention(
- self.__input_dim, self.__input_dim, self.__input_dim,
- self.__hidden_dim, self.__output_dim, self.__dropout_rate
- )
-
- def forward(self, input_x, seq_lens):
- dropout_x = self.__dropout_layer(input_x)
- attention_x = self.__attention_layer(
- dropout_x, dropout_x, dropout_x
- )
-
- flat_x = torch.cat(
- [attention_x[i][:seq_lens[i], :] for
- i in range(0, len(seq_lens))], dim=0
- )
- return flat_x
|