File size: 4,901 Bytes
d9f47da
b96f334
b50e558
d9f47da
 
 
 
 
cd4d499
 
d9f47da
21ead8a
 
 
 
 
 
d9f47da
 
 
 
 
 
 
 
21ead8a
d9f47da
21ead8a
d9f47da
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6432af5
 
d9f47da
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# os.environ['CUDA_VISIBLE_DEVICES'] ='0'

from dataset_with_embeddings import datasetx
from transformers import AutoModelForMultipleChoice
from transformers import AutoTokenizer
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from transformers import AutoTokenizer, AutoModel
import torch
import pandas as pd


GENERATOR_MODEL = "JosueElias/pipeline_generator_model"
GENERATOR_TOKENIZER = "JosueElias/pipeline_generator_tokenizer"
QUERY_MODEL = "JosueElias/pipeline_query_model"
QUERY_TOKENIZER = "JosueElias/pipeline_query_tokenizer"

DEVICE = "cpu" # cpu or cuda

class Pipeline:
    
    #---- init class
    
    def __init__(self):
        self.model = AutoModelForMultipleChoice.from_pretrained(GENERATOR_MODEL)
        self.tokenizer = AutoTokenizer.from_pretrained(GENERATOR_TOKENIZER)
        self.semModel = AutoModel.from_pretrained(QUERY_MODEL)   
        self.semTokenizer = AutoTokenizer.from_pretrained(QUERY_TOKENIZER)
        self.device = torch.device(DEVICE)

        self.semModel.to(self.device)
        self.model.to(self.device)
    
    #---- utils functions
    
    def convert_to_letter(self,a):
        if a == 0:
            return "A"
        if a==1:
            return "B"
        if a==2:
            return "C"
        if a==3:
            return "D"
        if a==4:
            return "E"
    
    def filter_stopwords(self,example_sent):
        stop_words = set(stopwords.words('english'))
        word_tokens = word_tokenize(example_sent)
        filtered_sentence = [w for w in word_tokens if not w.lower() in stop_words]
        return " ".join(filtered_sentence)
    
    def cls_pooling(self,model_output):
        return model_output.pooler_output#last_hidden_state[:, 0]

    def get_embeddings(self,text_list):
        encoded_input = self.semTokenizer(
            text_list, padding=True, truncation=True, return_tensors="pt",max_length =512
        )
        encoded_input = {k: v.to(self.device) for k, v in encoded_input.items()}
        model_output = self.semModel(**encoded_input)
        return self.cls_pooling(model_output)
        
    #---- retriever
    
    def get_context_from_text(self,question):
        question_embedding = self.get_embeddings([question]).cpu().detach().numpy()
        scores, samples = datasetx.get_nearest_examples(
            "embeddings", question_embedding, k=5
        )
        samples_df = pd.DataFrame.from_dict(samples)
        samples_df["scores"] = scores
        samples_df.sort_values("scores", ascending=False, inplace=True)
        contexts = ""
#         aux_row = ""
        for _, row in samples_df.iterrows():
            contexts = contexts + f"=={row.section}== {row.text} " 

#             if aux_row =={row.title}:
#                 contexts = contexts + f"=={row.section}== {row.text}" 
#             else:
#                 contexts = contexts + f"==={row.title}=== =={row.section}== {row.text}" 
#             aux_row = {row.title}
        return contexts

    #---- generator
    
    # [CLS] context #### question? [SEP] answer [SEP]
    def create_tokens(self,quetion_and_options,context):
        question = quetion_and_options["prompt"]
        candidate1 = "#### "+question +  " [SEP] "+quetion_and_options["A"]+  " [SEP]"
        candidate2 = "#### "+question +  " [SEP] "+quetion_and_options["B"]+  " [SEP]"
        candidate3 = "#### "+question +  " [SEP] "+quetion_and_options["C"]+  " [SEP]"
        candidate4 = "#### "+question +  " [SEP] "+quetion_and_options["D"]+  " [SEP]"
        candidate5 = "#### "+question +  " [SEP] "+quetion_and_options["E"]+  " [SEP]"
        prompt = "[CLS]"+ context

        inputs = self.tokenizer([
            [prompt, candidate1], 
            [prompt, candidate2], 
            [prompt, candidate3], 
            [prompt, candidate4], 
            [prompt, candidate5]
        ], return_tensors="pt", padding=True,truncation="only_first",max_length =512,add_special_tokens=False)
        labels = torch.tensor(0).unsqueeze(0)
        return (inputs,labels)
        
    def infer_answer(self,mi_tupla):
        (inputs,labels) = mi_tupla
        
        inputs = {k: v.to(self.device) for k, v in inputs.items()}
        labels = labels.to(self.device)
        
        outputs = self.model(**{k: v.unsqueeze(0) for k, v in inputs.items()}, labels=labels)
        logits = outputs.logits
        _, topk_indices = torch.topk(logits, k=3, dim=1)
        #predicted_class = logits.argmax().item()
        return topk_indices
    
    #---- retriever + generator

    def give_the_best_answer(self,dict_with_all_the_info):
        a = self.get_context_from_text(dict_with_all_the_info["prompt"])
        b = self.create_tokens(dict_with_all_the_info,a)
        c = self.infer_answer(b)
        d = self.convert_to_letter(int(c[0][0]))
        #print("\nThe answer is ",)
        return d
    
pipeline = Pipeline()