emanuelaboros's picture
Create handler.py
6ff6c37 verified
raw
history blame
5.31 kB
import torch
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
from typing import List, Dict, Any
import requests
import nltk
# Download required NLTK models
nltk.download("averaged_perceptron_tagger")
nltk.download("averaged_perceptron_tagger_eng")
# Define your model name
NEL_MODEL = "nel-mgenre-multilingual"
class NelPipeline:
def __init__(self, model_name: str):
self.model_name = model_name
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForSeq2SeqLM.from_pretrained(model_name).to(self.device)
def preprocess(self, text: str):
start_token = "[START]"
end_token = "[END]"
if start_token in text and end_token in text:
start_idx = text.index(start_token) + len(start_token)
end_idx = text.index(end_token)
enclosed_entity = text[start_idx:end_idx].strip()
lOffset = start_idx
rOffset = end_idx
else:
enclosed_entity = None
lOffset = None
rOffset = None
outputs = self.model.generate(
**self.tokenizer([text], return_tensors="pt").to(self.device),
num_beams=1,
num_return_sequences=1,
max_new_tokens=30,
return_dict_in_generate=True,
output_scores=True,
)
wikipedia_prediction = self.tokenizer.batch_decode(
outputs.sequences, skip_special_tokens=True
)[0]
transition_scores = self.model.compute_transition_scores(
outputs.sequences, outputs.scores, normalize_logits=True
)
log_prob_sum = sum(transition_scores[0])
sequence_confidence = torch.exp(log_prob_sum)
percentage = sequence_confidence.cpu().numpy() * 100.0
return wikipedia_prediction, enclosed_entity, lOffset, rOffset, percentage
def postprocess(self, outputs):
wikipedia_prediction, enclosed_entity, lOffset, rOffset, percentage = outputs
qid, language = get_wikipedia_page_props(wikipedia_prediction)
title, url = get_wikipedia_title(qid, language=language)
results = [
{
"surface": enclosed_entity,
"wkd_id": qid,
"wkpedia_pagename": title,
"wkpedia_url": url,
"type": "UNK",
"confidence_nel": round(percentage, 2),
"lOffset": lOffset,
"rOffset": rOffset,
}
]
return results
def get_wikipedia_page_props(input_str: str):
if ">>" not in input_str:
page_name = input_str
language = "en"
else:
try:
page_name, language = input_str.split(">>")
page_name = page_name.strip()
language = language.strip()
except:
page_name = input_str
language = "en"
wikipedia_url = f"https://{language}.wikipedia.org/w/api.php"
wikipedia_params = {
"action": "query",
"prop": "pageprops",
"format": "json",
"titles": page_name,
}
qid = "NIL"
try:
response = requests.get(wikipedia_url, params=wikipedia_params)
response.raise_for_status()
data = response.json()
if "pages" in data["query"]:
page_id = list(data["query"]["pages"].keys())[0]
if "pageprops" in data["query"]["pages"][page_id]:
page_props = data["query"]["pages"][page_id]["pageprops"]
if "wikibase_item" in page_props:
return page_props["wikibase_item"], language
else:
return qid, language
else:
return qid, language
else:
return qid, language
except Exception as e:
return qid, language
def get_wikipedia_title(qid, language="en"):
url = f"https://www.wikidata.org/w/api.php"
params = {
"action": "wbgetentities",
"format": "json",
"ids": qid,
"props": "sitelinks/urls",
"sitefilter": f"{language}wiki",
}
response = requests.get(url, params=params)
try:
response.raise_for_status()
data = response.json()
except requests.exceptions.RequestException as e:
return "NIL", "None"
except ValueError as e:
return "NIL", "None"
try:
title = data["entities"][qid]["sitelinks"][f"{language}wiki"]["title"]
url = data["entities"][qid]["sitelinks"][f"{language}wiki"]["url"]
return title, url
except KeyError:
return "NIL", "None"
class EndpointHandler:
def __init__(self, path: str = None):
# Initialize the NelPipeline with the specified model
self.pipeline = NelPipeline(NEL_MODEL)
def __call__(self, data: Dict[str, Any]) -> List[Dict[str, Any]]:
# Process incoming data
inputs = data.get("inputs", "")
if not isinstance(inputs, str):
raise ValueError("Input must be a string.")
# Preprocess, forward, and postprocess
preprocessed = self.pipeline.preprocess(inputs)
results = self.pipeline.postprocess(preprocessed)
return results