import torch from transformers import AutoTokenizer, AutoModelForSeq2SeqLM from typing import List, Dict, Any import requests import nltk # Download required NLTK models nltk.download("averaged_perceptron_tagger") nltk.download("averaged_perceptron_tagger_eng") # Define your model name NEL_MODEL = "nel-mgenre-multilingual" class NelPipeline: def __init__(self, model_name: str): self.model_name = model_name self.device = "cuda" if torch.cuda.is_available() else "cpu" self.tokenizer = AutoTokenizer.from_pretrained(model_name) self.model = AutoModelForSeq2SeqLM.from_pretrained(model_name).to(self.device) def preprocess(self, text: str): start_token = "[START]" end_token = "[END]" if start_token in text and end_token in text: start_idx = text.index(start_token) + len(start_token) end_idx = text.index(end_token) enclosed_entity = text[start_idx:end_idx].strip() lOffset = start_idx rOffset = end_idx else: enclosed_entity = None lOffset = None rOffset = None outputs = self.model.generate( **self.tokenizer([text], return_tensors="pt").to(self.device), num_beams=1, num_return_sequences=1, max_new_tokens=30, return_dict_in_generate=True, output_scores=True, ) wikipedia_prediction = self.tokenizer.batch_decode( outputs.sequences, skip_special_tokens=True )[0] transition_scores = self.model.compute_transition_scores( outputs.sequences, outputs.scores, normalize_logits=True ) log_prob_sum = sum(transition_scores[0]) sequence_confidence = torch.exp(log_prob_sum) percentage = sequence_confidence.cpu().numpy() * 100.0 return wikipedia_prediction, enclosed_entity, lOffset, rOffset, percentage def postprocess(self, outputs): wikipedia_prediction, enclosed_entity, lOffset, rOffset, percentage = outputs qid, language = get_wikipedia_page_props(wikipedia_prediction) title, url = get_wikipedia_title(qid, language=language) results = [ { "surface": enclosed_entity, "wkd_id": qid, "wkpedia_pagename": title, "wkpedia_url": url, "type": "UNK", "confidence_nel": round(percentage, 2), "lOffset": lOffset, "rOffset": rOffset, } ] return results def get_wikipedia_page_props(input_str: str): if ">>" not in input_str: page_name = input_str language = "en" else: try: page_name, language = input_str.split(">>") page_name = page_name.strip() language = language.strip() except: page_name = input_str language = "en" wikipedia_url = f"https://{language}.wikipedia.org/w/api.php" wikipedia_params = { "action": "query", "prop": "pageprops", "format": "json", "titles": page_name, } qid = "NIL" try: response = requests.get(wikipedia_url, params=wikipedia_params) response.raise_for_status() data = response.json() if "pages" in data["query"]: page_id = list(data["query"]["pages"].keys())[0] if "pageprops" in data["query"]["pages"][page_id]: page_props = data["query"]["pages"][page_id]["pageprops"] if "wikibase_item" in page_props: return page_props["wikibase_item"], language else: return qid, language else: return qid, language else: return qid, language except Exception as e: return qid, language def get_wikipedia_title(qid, language="en"): url = f"https://www.wikidata.org/w/api.php" params = { "action": "wbgetentities", "format": "json", "ids": qid, "props": "sitelinks/urls", "sitefilter": f"{language}wiki", } response = requests.get(url, params=params) try: response.raise_for_status() data = response.json() except requests.exceptions.RequestException as e: return "NIL", "None" except ValueError as e: return "NIL", "None" try: title = data["entities"][qid]["sitelinks"][f"{language}wiki"]["title"] url = data["entities"][qid]["sitelinks"][f"{language}wiki"]["url"] return title, url except KeyError: return "NIL", "None" class EndpointHandler: def __init__(self, path: str = None): # Initialize the NelPipeline with the specified model self.pipeline = NelPipeline(NEL_MODEL) def __call__(self, data: Dict[str, Any]) -> List[Dict[str, Any]]: # Process incoming data inputs = data.get("inputs", "") if not isinstance(inputs, str): raise ValueError("Input must be a string.") # Preprocess, forward, and postprocess preprocessed = self.pipeline.preprocess(inputs) results = self.pipeline.postprocess(preprocessed) return results