|
from transformers import Pipeline |
|
import nltk |
|
import requests |
|
import torch |
|
|
|
nltk.download("averaged_perceptron_tagger") |
|
nltk.download("averaged_perceptron_tagger_eng") |
|
|
|
NEL_MODEL = "nel-mgenre-multilingual" |
|
|
|
|
|
def get_wikipedia_page_props(input_str: str): |
|
""" |
|
Retrieves the QID for a given Wikipedia page name from the specified language Wikipedia. |
|
If the request fails, it falls back to using the OpenRefine Wikidata API. |
|
|
|
Args: |
|
input_str (str): The input string in the format "page_name >> language". |
|
|
|
Returns: |
|
str: The QID or "NIL" if the QID is not found. |
|
""" |
|
try: |
|
|
|
page_name, language = input_str.split(" >> ") |
|
page_name = page_name.strip() |
|
language = language.strip() |
|
except ValueError: |
|
return "Invalid input format. Use 'page_name >> language'.", "None" |
|
|
|
wikipedia_url = f"https://{language}.wikipedia.org/w/api.php" |
|
wikipedia_params = { |
|
"action": "query", |
|
"prop": "pageprops", |
|
"format": "json", |
|
"titles": page_name, |
|
} |
|
|
|
qid = "NIL" |
|
try: |
|
|
|
response = requests.get(wikipedia_url, params=wikipedia_params) |
|
response.raise_for_status() |
|
data = response.json() |
|
|
|
if "pages" in data["query"]: |
|
page_id = list(data["query"]["pages"].keys())[0] |
|
|
|
if "pageprops" in data["query"]["pages"][page_id]: |
|
page_props = data["query"]["pages"][page_id]["pageprops"] |
|
|
|
if "wikibase_item" in page_props: |
|
return page_props["wikibase_item"], language |
|
else: |
|
return qid, language |
|
else: |
|
return qid, language |
|
except Exception as e: |
|
return qid, language |
|
|
|
|
|
def get_wikipedia_title(qid, language="en"): |
|
url = f"https://www.wikidata.org/w/api.php" |
|
params = { |
|
"action": "wbgetentities", |
|
"format": "json", |
|
"ids": qid, |
|
"props": "sitelinks/urls", |
|
"sitefilter": f"{language}wiki", |
|
} |
|
|
|
response = requests.get(url, params=params) |
|
data = response.json() |
|
|
|
try: |
|
title = data["entities"][qid]["sitelinks"][f"{language}wiki"]["title"] |
|
url = data["entities"][qid]["sitelinks"][f"{language}wiki"]["url"] |
|
return title, url |
|
except KeyError: |
|
return "NIL", "None" |
|
|
|
|
|
class NelPipeline(Pipeline): |
|
|
|
def _sanitize_parameters(self, **kwargs): |
|
preprocess_kwargs = {} |
|
if "text" in kwargs: |
|
preprocess_kwargs["text"] = kwargs["text"] |
|
|
|
return preprocess_kwargs, {}, {} |
|
|
|
def preprocess(self, text, **kwargs): |
|
|
|
start_token = "[START]" |
|
end_token = "[END]" |
|
|
|
if start_token in text and end_token in text: |
|
start_idx = text.index(start_token) + len(start_token) |
|
end_idx = text.index(end_token) |
|
enclosed_entity = text[start_idx:end_idx].strip() |
|
lOffset = start_idx |
|
rOffset = end_idx |
|
else: |
|
enclosed_entity = None |
|
lOffset = None |
|
rOffset = None |
|
|
|
|
|
outputs = self.model.generate( |
|
**self.tokenizer([text], return_tensors="pt").to(self.device), |
|
num_beams=1, |
|
num_return_sequences=1, |
|
max_new_tokens=30, |
|
return_dict_in_generate=True, |
|
output_scores=True, |
|
) |
|
|
|
wikipedia_predictions = self.tokenizer.batch_decode( |
|
outputs.sequences, skip_special_tokens=True |
|
) |
|
|
|
|
|
transition_scores = self.model.compute_transition_scores( |
|
outputs.sequences, outputs.scores, normalize_logits=True |
|
) |
|
log_prob_sum = sum(transition_scores[0]) |
|
|
|
|
|
sequence_confidence = torch.exp(log_prob_sum) |
|
percentages = sequence_confidence.cpu().numpy() * 100.0 |
|
|
|
|
|
return wikipedia_predictions, enclosed_entity, lOffset, rOffset, [percentages] |
|
|
|
def _forward(self, inputs): |
|
return inputs |
|
|
|
def postprocess(self, outputs, **kwargs): |
|
""" |
|
Postprocess the outputs of the model |
|
:param outputs: |
|
:param kwargs: |
|
:return: |
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
wikipedia_predictions, enclosed_entity, lOffset, rOffset, percentages = outputs |
|
results = [] |
|
for idx, wikipedia_name in enumerate(wikipedia_predictions): |
|
|
|
qid, language = get_wikipedia_page_props(wikipedia_name) |
|
|
|
|
|
|
|
wkpedia_pagename, url = get_wikipedia_title(qid, language) |
|
results.append( |
|
{ |
|
|
|
"surface": enclosed_entity, |
|
"wkd_id": qid, |
|
"wkpedia_pagename": wkpedia_pagename, |
|
"wkpedia_url": url, |
|
"type": "UNK", |
|
"confidence_nel": round(percentages[idx], 2), |
|
"lOffset": lOffset, |
|
"rOffset": rOffset, |
|
} |
|
) |
|
|
|
return results |
|
|