|
|
|
import time |
|
from datasets import load_dataset |
|
import numpy as np |
|
import transformers |
|
import torch |
|
import torch.nn as nn |
|
import torch.optim as optim |
|
from torch.utils.data import DataLoader, TensorDataset |
|
|
|
class custom_RNNCell(nn.Module): |
|
def __init__(self, input_size: int, hidden_size: int, device='cpu'): |
|
|
|
|
|
|
|
|
|
|
|
|
|
super(custom_RNNCell, self).__init__() |
|
self.hidden_size = hidden_size |
|
self.device = device |
|
|
|
|
|
fan_in_Wx = input_size |
|
fan_out_Wx = hidden_size |
|
limit_Wx = np.sqrt(6 / (fan_in_Wx + fan_out_Wx)) |
|
|
|
fan_in_Wh = hidden_size |
|
fan_out_Wh = hidden_size |
|
limit_Wh = np.sqrt(6 / (fan_in_Wh + fan_out_Wh)) |
|
|
|
|
|
self.Wx = nn.Parameter(torch.empty(input_size, hidden_size, device=device)) |
|
self.Wh = nn.Parameter(torch.empty(hidden_size, hidden_size, device=device)) |
|
self.bh = nn.Parameter(torch.zeros(hidden_size, device=device)) |
|
|
|
|
|
nn.init.uniform_(self.Wx, -limit_Wx, limit_Wx) |
|
nn.init.uniform_(self.Wh, -limit_Wh, limit_Wh) |
|
|
|
def forward(self, input_t: torch.Tensor, h_prev: torch.Tensor) -> torch.Tensor: |
|
|
|
|
|
|
|
|
|
|
|
|
|
h_t = torch.tanh(torch.mm(input_t, self.Wx) + torch.mm(h_prev, self.Wh) + self.bh) |
|
return h_t |
|
|
|
class custom_GRUCell(nn.Module): |
|
def __init__(self, input_size: int, hidden_size: int, device='cpu'): |
|
|
|
|
|
|
|
|
|
|
|
|
|
super(custom_GRUCell, self).__init__() |
|
self.hidden_size = hidden_size |
|
self.device = device |
|
|
|
|
|
fan_in = input_size + hidden_size |
|
fan_out = hidden_size |
|
limit = (6 / (fan_in + fan_out)) ** 0.5 |
|
|
|
|
|
self.Wz = nn.Parameter(torch.empty(input_size + hidden_size, hidden_size, device=device)) |
|
self.Wr = nn.Parameter(torch.empty(input_size + hidden_size, hidden_size, device=device)) |
|
self.Wh = nn.Parameter(torch.empty(input_size + hidden_size, hidden_size, device=device)) |
|
|
|
|
|
nn.init.uniform_(self.Wz, -limit, limit) |
|
nn.init.uniform_(self.Wr, -limit, limit) |
|
nn.init.uniform_(self.Wh, -limit, limit) |
|
|
|
|
|
self.bz = nn.Parameter(torch.zeros(hidden_size, device=device)) |
|
self.br = nn.Parameter(torch.zeros(hidden_size, device=device)) |
|
self.bh = nn.Parameter(torch.zeros(hidden_size, device=device)) |
|
|
|
def forward(self, input_t: torch.Tensor, h_prev: torch.Tensor) -> torch.Tensor: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
concat = torch.cat((input_t, h_prev), dim=1) |
|
|
|
|
|
z_t = torch.sigmoid(torch.matmul(concat, self.Wz) + self.bz) |
|
|
|
|
|
r_t = torch.sigmoid(torch.matmul(concat, self.Wr) + self.br) |
|
|
|
|
|
concat_reset = torch.cat((input_t, r_t * h_prev), dim=1) |
|
h_hat_t = torch.tanh(torch.matmul(concat_reset, self.Wh) + self.bh) |
|
|
|
|
|
h_t = (1 - z_t) * h_prev + z_t * h_hat_t |
|
|
|
return h_t |
|
|
|
class custom_LSTMCell(nn.Module): |
|
def __init__(self, input_size: int, hidden_size: int, device='cpu'): |
|
super(custom_LSTMCell, self).__init__() |
|
self.hidden_size = hidden_size |
|
self.device = device |
|
|
|
|
|
self.Wf = nn.Parameter(torch.empty(input_size + hidden_size, hidden_size, device=device)) |
|
self.Wi = nn.Parameter(torch.empty(input_size + hidden_size, hidden_size, device=device)) |
|
self.Wc = nn.Parameter(torch.empty(input_size + hidden_size, hidden_size, device=device)) |
|
self.Wo = nn.Parameter(torch.empty(input_size + hidden_size, hidden_size, device=device)) |
|
|
|
|
|
nn.init.xavier_uniform_(self.Wf) |
|
nn.init.xavier_uniform_(self.Wi) |
|
nn.init.xavier_uniform_(self.Wc) |
|
nn.init.xavier_uniform_(self.Wo) |
|
|
|
|
|
self.bf = nn.Parameter(torch.zeros(hidden_size, device=device)) |
|
self.bi = nn.Parameter(torch.zeros(hidden_size, device=device)) |
|
self.bc = nn.Parameter(torch.zeros(hidden_size, device=device)) |
|
self.bo = nn.Parameter(torch.zeros(hidden_size, device=device)) |
|
|
|
|
|
nn.init.constant_(self.bf, 1.0) |
|
|
|
def forward(self, x_t: torch.Tensor, h_prev: torch.Tensor, c_prev: torch.Tensor) -> tuple: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
concat = torch.cat((x_t, h_prev), dim=1) |
|
|
|
|
|
f_t = torch.sigmoid(torch.matmul(concat, self.Wf) + self.bf) |
|
|
|
|
|
i_t = torch.sigmoid(torch.matmul(concat, self.Wi) + self.bi) |
|
|
|
|
|
c_hat_t = torch.tanh(torch.matmul(concat, self.Wc) + self.bc) |
|
|
|
|
|
c_t = f_t * c_prev + i_t * c_hat_t |
|
|
|
|
|
o_t = torch.sigmoid(torch.matmul(concat, self.Wo) + self.bo) |
|
|
|
|
|
h_t = o_t * torch.tanh(c_t) |
|
|
|
|
|
return h_t, c_t |
|
|
|
class RecurrentLayer(nn.Module): |
|
def __init__(self, input_size: int, hidden_size: int, cell_type: str = 'RNN', device='cpu'): |
|
super(RecurrentLayer, self).__init__() |
|
self.hidden_size = hidden_size |
|
self.device = device |
|
self.cell_type = cell_type |
|
|
|
|
|
if cell_type == 'RNN': |
|
self.cell = nn.RNN(input_size, hidden_size, batch_first=True, bidirectional=True) |
|
elif cell_type == 'custom_RNN': |
|
self.cell = custom_RNNCell(input_size, hidden_size) |
|
elif cell_type == 'GRU': |
|
self.cell = nn.GRU(input_size, hidden_size, batch_first=True, bidirectional=True) |
|
elif cell_type == 'custom_GRU': |
|
self.cell = custom_GRUCell(input_size, hidden_size, device) |
|
elif cell_type == 'LSTM': |
|
self.cell = nn.LSTMCell(input_size, hidden_size) |
|
elif cell_type == 'custom_LSTM': |
|
self.cell = custom_LSTMCell(input_size, hidden_size, device) |
|
else: |
|
raise ValueError("Unsupported cell type") |
|
|
|
def forward(self, inputs: torch.Tensor) -> tuple: |
|
|
|
|
|
|
|
|
|
batch_size, seq_len, _ = inputs.shape |
|
|
|
|
|
h_forward = torch.zeros(batch_size, self.hidden_size, device=self.device) |
|
h_backward = torch.zeros(batch_size, self.hidden_size, device=self.device) |
|
|
|
if self.cell_type == 'custom_LSTM': |
|
c_forward = torch.zeros(batch_size, self.hidden_size, device=self.device) |
|
c_backward = torch.zeros(batch_size, self.hidden_size, device=self.device) |
|
|
|
|
|
forward_outputs = [] |
|
backward_outputs = [] |
|
|
|
|
|
h = h_forward |
|
c = c_forward if self.cell_type == 'custom_LSTM' else None |
|
for t in range(seq_len): |
|
if self.cell_type == 'custom_LSTM': |
|
h, c = self.cell(inputs[:, t], h, c) |
|
else: |
|
h = self.cell(inputs[:, t], h) |
|
forward_outputs.append(h) |
|
|
|
|
|
h = h_backward |
|
c = c_backward if self.cell_type == 'custom_LSTM' else None |
|
for t in range(seq_len - 1, -1, -1): |
|
if self.cell_type == 'custom_LSTM': |
|
h, c = self.cell(inputs[:, t], h, c) |
|
else: |
|
h = self.cell(inputs[:, t], h) |
|
backward_outputs.insert(0, h) |
|
|
|
|
|
forward_output = torch.stack(forward_outputs, dim=1) |
|
backward_output = torch.stack(backward_outputs, dim=1) |
|
output = torch.cat((forward_output, backward_output), dim=2) |
|
|
|
|
|
final_hidden = torch.stack([forward_outputs[-1], backward_outputs[-1]], dim=0) |
|
|
|
return output, final_hidden |
|
|
|
class Attention(nn.Module): |
|
def __init__(self, hidden_size): |
|
super(Attention, self).__init__() |
|
self.W1 = nn.Linear(hidden_size, hidden_size) |
|
self.W2 = nn.Linear(hidden_size, hidden_size) |
|
self.v = nn.Linear(hidden_size, 1, bias=False) |
|
|
|
def forward(self, hidden, encoder_outputs): |
|
|
|
|
|
sequence_len = encoder_outputs.shape[1] |
|
hidden = hidden.unsqueeze(1).repeat(1, sequence_len, 1) |
|
|
|
energy = torch.tanh(self.W1(encoder_outputs) + self.W2(hidden)) |
|
attention = self.v(energy).squeeze(2) |
|
attention_weights = torch.softmax(attention, dim=1) |
|
|
|
|
|
context = torch.bmm(attention_weights.unsqueeze(1), encoder_outputs).squeeze(1) |
|
return context, attention_weights |
|
|
|
class SimpleRecurrentNetworkWithAttention(nn.Module): |
|
def __init__(self, input_size, hidden_size, output_size, cell_type='RNN'): |
|
super(SimpleRecurrentNetworkWithAttention, self).__init__() |
|
|
|
self.embedding = nn.Embedding(input_size, hidden_size) |
|
self.attention = Attention(hidden_size * 2) |
|
self.cell_type = cell_type |
|
|
|
if cell_type == 'RNN': |
|
self.cell = nn.RNN(hidden_size, hidden_size, batch_first=True, bidirectional=True) |
|
elif cell_type == 'custom_RNN': |
|
self.cell = RecurrentLayer(hidden_size, hidden_size, cell_type="custom_RNN") |
|
elif cell_type == 'GRU': |
|
self.cell = nn.GRU(hidden_size, hidden_size, batch_first=True, bidirectional=True) |
|
elif cell_type == 'custom_GRU': |
|
self.cell = RecurrentLayer(hidden_size, hidden_size, cell_type="custom_GRU") |
|
elif cell_type == 'LSTM': |
|
self.cell = nn.LSTM(hidden_size, hidden_size, batch_first=True, bidirectional=True) |
|
elif cell_type == 'custom_LSTM': |
|
self.cell = RecurrentLayer(hidden_size, hidden_size, cell_type="custom_LSTM") |
|
else: |
|
raise ValueError("Unsupported cell type. Choose from 'RNN', 'custom_RNN', 'GRU', 'custom_GRU', 'LSTM' or 'custom_LSTM'.") |
|
|
|
self.fc = nn.Linear(hidden_size * 2, output_size) |
|
|
|
def forward(self, inputs): |
|
embedded = self.embedding(inputs) |
|
rnn_output, hidden = self.cell(embedded) |
|
|
|
if isinstance(hidden, tuple): |
|
hidden = hidden[0] |
|
|
|
|
|
hidden = torch.cat((hidden[-2], hidden[-1]), dim=1) |
|
|
|
|
|
context, attention_weights = self.attention(hidden, rnn_output) |
|
|
|
|
|
output = self.fc(context) |
|
|
|
return output, attention_weights |
|
|