Spaces:
Sleeping
Sleeping
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
from transformers.models.bert.modeling_bert import BertPreTrainedModel, BertModel | |
from transformers.models.roberta.modeling_roberta import RobertaPreTrainedModel, RobertaModel | |
from transformers.models.albert.modeling_albert import AlbertPreTrainedModel, AlbertModel | |
from transformers.models.megatron_bert.modeling_megatron_bert import MegatronBertPreTrainedModel, MegatronBertModel | |
from transformers.modeling_outputs import TokenClassifierOutput | |
from torch.nn import CrossEntropyLoss | |
from loss.focal_loss import FocalLoss | |
from loss.label_smoothing import LabelSmoothingCrossEntropy | |
from models.basic_modules.crf import CRF | |
from tools.model_utils.parameter_freeze import ParameterFreeze | |
from tools.runner_utils.log_util import logging | |
logger = logging.getLogger(__name__) | |
freezer = ParameterFreeze() | |
""" | |
BERT for token-level classification with softmax head. | |
""" | |
class BertSoftmaxForSequenceLabeling(BertPreTrainedModel): | |
def __init__(self, config): | |
super(BertSoftmaxForSequenceLabeling, self).__init__(config) | |
self.num_labels = config.num_labels | |
self.bert = BertModel(config) | |
if self.config.use_freezing: | |
self.bert = freezer.freeze_lm(self.bert) | |
self.dropout = nn.Dropout(config.hidden_dropout_prob) | |
self.classifier = nn.Linear(config.hidden_size, config.num_labels) | |
self.loss_type = config.loss_type | |
self.init_weights() | |
def forward( | |
self, | |
input_ids, | |
attention_mask=None, | |
token_type_ids=None, | |
position_ids=None, | |
head_mask=None, | |
labels=None, | |
return_dict=False, | |
): | |
outputs = self.bert(input_ids = input_ids,attention_mask=attention_mask,token_type_ids=token_type_ids) | |
sequence_output = outputs[0] | |
sequence_output = self.dropout(sequence_output) | |
logits = self.classifier(sequence_output) | |
outputs = (logits,) + outputs[2:] # add hidden states and attention if they are here | |
if labels is not None: | |
assert self.loss_type in ["lsr", "focal", "ce"] | |
if self.loss_type == "lsr": | |
loss_fct = LabelSmoothingCrossEntropy(ignore_index=0) | |
elif self.loss_type == "focal": | |
loss_fct = FocalLoss(ignore_index=0) | |
else: | |
loss_fct = CrossEntropyLoss(ignore_index=0) | |
# Only keep active parts of the loss | |
if attention_mask is not None: | |
active_loss = attention_mask.view(-1) == 1 | |
active_logits = logits.view(-1, self.num_labels)[active_loss] | |
active_labels = labels.view(-1)[active_loss] | |
loss = loss_fct(active_logits, active_labels) | |
else: | |
loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1)) | |
if not return_dict: | |
outputs = (loss,) + outputs | |
return outputs # (loss), scores, (hidden_states), (attentions) | |
return TokenClassifierOutput( | |
loss=loss, | |
logits=logits, | |
hidden_states=outputs.hidden_states, | |
attentions=outputs.attentions, | |
) | |
""" | |
RoBERTa for token-level classification with softmax head. | |
""" | |
class RobertaSoftmaxForSequenceLabeling(RobertaPreTrainedModel): | |
def __init__(self, config): | |
super(RobertaSoftmaxForSequenceLabeling, self).__init__(config) | |
self.num_labels = config.num_labels | |
self.roberta = RobertaModel(config) | |
if self.config.use_freezing: | |
self.roberta = freezer.freeze_lm(self.roberta) | |
self.dropout = nn.Dropout(config.hidden_dropout_prob) | |
self.classifier = nn.Linear(config.hidden_size, config.num_labels) | |
self.loss_type = config.loss_type | |
self.init_weights() | |
def forward( | |
self, | |
input_ids, | |
attention_mask=None, | |
token_type_ids=None, | |
position_ids=None, | |
head_mask=None, | |
labels=None, | |
return_dict=False, | |
): | |
outputs = self.roberta(input_ids = input_ids,attention_mask=attention_mask,token_type_ids=token_type_ids) | |
sequence_output = outputs[0] | |
sequence_output = self.dropout(sequence_output) | |
logits = self.classifier(sequence_output) | |
outputs = (logits,) + outputs[2:] # add hidden states and attention if they are here | |
if labels is not None: | |
assert self.loss_type in ["lsr", "focal", "ce"] | |
if self.loss_type == "lsr": | |
loss_fct = LabelSmoothingCrossEntropy(ignore_index=0) | |
elif self.loss_type == "focal": | |
loss_fct = FocalLoss(ignore_index=0) | |
else: | |
loss_fct = CrossEntropyLoss(ignore_index=0) | |
# Only keep active parts of the loss | |
if attention_mask is not None: | |
active_loss = attention_mask.view(-1) == 1 | |
active_logits = logits.view(-1, self.num_labels)[active_loss] | |
active_labels = labels.view(-1)[active_loss] | |
loss = loss_fct(active_logits, active_labels) | |
else: | |
loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1)) | |
if not return_dict: | |
outputs = (loss,) + outputs | |
return outputs # (loss), scores, (hidden_states), (attentions) | |
return TokenClassifierOutput( | |
loss=loss, | |
logits=logits, | |
hidden_states=outputs.hidden_states, | |
attentions=outputs.attentions, | |
) | |
""" | |
ALBERT for token-level classification with softmax head. | |
""" | |
class AlbertSoftmaxForSequenceLabeling(AlbertPreTrainedModel): | |
def __init__(self, config): | |
super(AlbertSoftmaxForSequenceLabeling, self).__init__(config) | |
self.num_labels = config.num_labels | |
self.loss_type = config.loss_type | |
self.bert = AlbertModel(config) | |
if self.config.use_freezing: | |
self.bert = freezer.freeze_lm(self.bert) | |
self.dropout = nn.Dropout(config.hidden_dropout_prob) | |
self.classifier = nn.Linear(config.hidden_size, config.num_labels) | |
self.init_weights() | |
def forward( | |
self, | |
input_ids, | |
attention_mask=None, | |
token_type_ids=None, | |
position_ids=None, | |
head_mask=None, | |
labels=None, | |
return_dict=False, | |
): | |
outputs = self.bert(input_ids = input_ids,attention_mask=attention_mask,token_type_ids=token_type_ids, | |
position_ids=position_ids,head_mask=head_mask) | |
sequence_output = outputs[0] | |
sequence_output = self.dropout(sequence_output) | |
logits = self.classifier(sequence_output) | |
outputs = (logits,) + outputs[2:] # add hidden states and attention if they are here | |
if labels is not None: | |
assert self.loss_type in ["lsr", "focal", "ce"] | |
if self.loss_type =="lsr": | |
loss_fct = LabelSmoothingCrossEntropy(ignore_index=0) | |
elif self.loss_type == "focal": | |
loss_fct = FocalLoss(ignore_index=0) | |
else: | |
loss_fct = CrossEntropyLoss(ignore_index=0) | |
# Only keep active parts of the loss | |
if attention_mask is not None: | |
active_loss = attention_mask.view(-1) == 1 | |
active_logits = logits.view(-1, self.num_labels)[active_loss] | |
active_labels = labels.view(-1)[active_loss] | |
loss = loss_fct(active_logits, active_labels) | |
else: | |
loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1)) | |
if not return_dict: | |
outputs = (loss,) + outputs | |
return outputs # (loss), scores, (hidden_states), (attentions) | |
return TokenClassifierOutput( | |
loss=loss, | |
logits=logits, | |
hidden_states=outputs.hidden_states, | |
attentions=outputs.attentions, | |
) | |
""" | |
MegatronBERT for token-level classification with softmax head. | |
""" | |
class MegatronBertSoftmaxForSequenceLabeling(MegatronBertPreTrainedModel): | |
def __init__(self, config): | |
super(MegatronBertSoftmaxForSequenceLabeling, self).__init__(config) | |
self.num_labels = config.num_labels | |
self.bert = MegatronBertModel(config) | |
if self.config.use_freezing: | |
self.bert = freezer.freeze_lm(self.bert) | |
self.dropout = nn.Dropout(config.hidden_dropout_prob) | |
self.classifier = nn.Linear(config.hidden_size, config.num_labels) | |
self.loss_type = config.loss_type | |
self.init_weights() | |
def forward( | |
self, | |
input_ids, | |
attention_mask=None, | |
token_type_ids=None, | |
position_ids=None, | |
head_mask=None, | |
labels=None, | |
return_dict=False, | |
): | |
outputs = self.bert(input_ids = input_ids,attention_mask=attention_mask,token_type_ids=token_type_ids) | |
sequence_output = outputs[0] | |
sequence_output = self.dropout(sequence_output) | |
logits = self.classifier(sequence_output) | |
outputs = (logits,) + outputs[2:] # add hidden states and attention if they are here | |
if labels is not None: | |
assert self.loss_type in ["lsr", "focal", "ce"] | |
if self.loss_type == "lsr": | |
loss_fct = LabelSmoothingCrossEntropy(ignore_index=0) | |
elif self.loss_type == "focal": | |
loss_fct = FocalLoss(ignore_index=0) | |
else: | |
loss_fct = CrossEntropyLoss(ignore_index=0) | |
# Only keep active parts of the loss | |
if attention_mask is not None: | |
active_loss = attention_mask.view(-1) == 1 | |
active_logits = logits.view(-1, self.num_labels)[active_loss] | |
active_labels = labels.view(-1)[active_loss] | |
loss = loss_fct(active_logits, active_labels) | |
else: | |
loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1)) | |
if not return_dict: | |
outputs = (loss,) + outputs | |
return outputs # (loss), scores, (hidden_states), (attentions) | |
return TokenClassifierOutput( | |
loss=loss, | |
logits=logits, | |
hidden_states=outputs.hidden_states, | |
attentions=outputs.attentions, | |
) | |
""" | |
BERT for token-level classification with CRF head. | |
""" | |
class BertCrfForSequenceLabeling(BertPreTrainedModel): | |
def __init__(self, config): | |
super(BertCrfForSequenceLabeling, self).__init__(config) | |
self.bert = BertModel(config) | |
if self.config.use_freezing: | |
self.bert = freezer.freeze_lm(self.bert) | |
self.dropout = nn.Dropout(config.hidden_dropout_prob) | |
self.classifier = nn.Linear(config.hidden_size, config.num_labels) | |
self.crf = CRF(num_tags=config.num_labels, batch_first=True) | |
self.init_weights() | |
def forward( | |
self, | |
input_ids, | |
attention_mask=None, | |
token_type_ids=None, | |
position_ids=None, | |
head_mask=None, | |
labels=None, | |
return_dict=False, | |
): | |
outputs =self.bert(input_ids = input_ids,attention_mask=attention_mask,token_type_ids=token_type_ids) | |
sequence_output = outputs[0] | |
sequence_output = self.dropout(sequence_output) | |
logits = self.classifier(sequence_output) | |
outputs = (logits,) | |
if labels is not None: | |
loss = self.crf(emissions = logits, tags=labels, mask=attention_mask) | |
outputs =(-1*loss,)+outputs | |
if not return_dict: | |
return outputs # (loss), scores, (hidden_states), (attentions) | |
return TokenClassifierOutput( | |
loss=loss, | |
logits=logits, | |
hidden_states=outputs.hidden_states, | |
attentions=outputs.attentions, | |
) | |
""" | |
RoBERTa for token-level classification with CRF head. | |
""" | |
class RobertaCrfForSequenceLabeling(RobertaPreTrainedModel): | |
def __init__(self, config): | |
super(RobertaCrfForSequenceLabeling, self).__init__(config) | |
self.roberta = RobertaModel(config) | |
if self.config.use_freezing: | |
self.roberta = freezer.freeze_lm(self.roberta) | |
self.dropout = nn.Dropout(config.hidden_dropout_prob) | |
self.classifier = nn.Linear(config.hidden_size, config.num_labels) | |
self.crf = CRF(num_tags=config.num_labels, batch_first=True) | |
self.init_weights() | |
def forward( | |
self, | |
input_ids, | |
attention_mask=None, | |
token_type_ids=None, | |
position_ids=None, | |
head_mask=None, | |
labels=None, | |
return_dict=False, | |
): | |
outputs =self.roberta(input_ids = input_ids,attention_mask=attention_mask,token_type_ids=token_type_ids) | |
sequence_output = outputs[0] | |
sequence_output = self.dropout(sequence_output) | |
logits = self.classifier(sequence_output) | |
outputs = (logits,) | |
if labels is not None: | |
loss = self.crf(emissions = logits, tags=labels, mask=attention_mask) | |
outputs =(-1*loss,)+outputs | |
if not return_dict: | |
return outputs # (loss), scores, (hidden_states), (attentions) | |
return TokenClassifierOutput( | |
loss=loss, | |
logits=logits, | |
hidden_states=outputs.hidden_states, | |
attentions=outputs.attentions, | |
) | |
""" | |
ALBERT for token-level classification with CRF head. | |
""" | |
class AlbertCrfForSequenceLabeling(AlbertPreTrainedModel): | |
def __init__(self, config): | |
super(AlbertCrfForSequenceLabeling, self).__init__(config) | |
self.bert = AlbertModel(config) | |
if self.config.use_freezing: | |
self.bert = freezer.freeze_lm(self.bert) | |
self.dropout = nn.Dropout(config.hidden_dropout_prob) | |
self.classifier = nn.Linear(config.hidden_size, config.num_labels) | |
self.crf = CRF(num_tags=config.num_labels, batch_first=True) | |
self.init_weights() | |
def forward( | |
self, | |
input_ids, | |
attention_mask=None, | |
token_type_ids=None, | |
position_ids=None, | |
head_mask=None, | |
labels=None, | |
return_dict=False, | |
): | |
outputs = self.bert(input_ids = input_ids,attention_mask=attention_mask,token_type_ids=token_type_ids) | |
sequence_output = outputs[0] | |
sequence_output = self.dropout(sequence_output) | |
logits = self.classifier(sequence_output) | |
outputs = (logits,) | |
if labels is not None: | |
loss = self.crf(emissions = logits, tags=labels, mask=attention_mask) | |
outputs =(-1*loss,)+outputs | |
if not return_dict: | |
return outputs # (loss), scores, (hidden_states), (attentions) | |
return TokenClassifierOutput( | |
loss=loss, | |
logits=logits, | |
hidden_states=outputs.hidden_states, | |
attentions=outputs.attentions, | |
) | |
""" | |
MegatronBERT for token-level classification with CRF head. | |
""" | |
class MegatronBertCrfForSequenceLabeling(MegatronBertPreTrainedModel): | |
def __init__(self, config): | |
super(MegatronBertCrfForSequenceLabeling, self).__init__(config) | |
self.bert = MegatronBertModel(config) | |
if self.config.use_freezing: | |
self.bert = freezer.freeze_lm(self.bert) | |
self.dropout = nn.Dropout(config.hidden_dropout_prob) | |
self.classifier = nn.Linear(config.hidden_size, config.num_labels) | |
self.crf = CRF(num_tags=config.num_labels, batch_first=True) | |
self.init_weights() | |
def forward( | |
self, | |
input_ids, | |
attention_mask=None, | |
token_type_ids=None, | |
position_ids=None, | |
head_mask=None, | |
labels=None, | |
return_dict=False, | |
): | |
outputs =self.bert(input_ids = input_ids,attention_mask=attention_mask,token_type_ids=token_type_ids) | |
sequence_output = outputs[0] | |
sequence_output = self.dropout(sequence_output) | |
logits = self.classifier(sequence_output) | |
outputs = (logits,) | |
if labels is not None: | |
loss = self.crf(emissions = logits, tags=labels, mask=attention_mask) | |
outputs =(-1*loss,)+outputs | |
if not return_dict: | |
return outputs # (loss), scores, (hidden_states), (attentions) | |
return TokenClassifierOutput( | |
loss=loss, | |
logits=logits, | |
hidden_states=outputs.hidden_states, | |
attentions=outputs.attentions, | |
) | |