anonimizador / app.py
dayannex's picture
app modified
476450d
raw
history blame
21.2 kB
import gradio as gr
import torch
import json
import pandas as pd
import numpy as np
from transformers import AutoTokenizer, RobertaForTokenClassification
from transformers import AutoTokenizer, AutoModelForTokenClassification
from transformers import AutoModelForSequenceClassification, AutoTokenizer
from json import JSONEncoder
from faker import Faker
from keras.utils import pad_sequences
class out_json():
def __init__(self, w,l):
self.word = w
self.label = l
class MyEncoder(JSONEncoder):
def default(self, o):
return o.__dict__
class Model:
def __init__(self):
self.texto=""
self.idioma=""
self.modelo_ner=""
self.categoria_texto=""
def identificacion_idioma(self,text):
self.texto=text
tokenizer = AutoTokenizer.from_pretrained("papluca/xlm-roberta-base-language-detection")
model = AutoModelForSequenceClassification.from_pretrained("papluca/xlm-roberta-base-language-detection")
inputs = tokenizer(text, padding=True, truncation=True, return_tensors="pt")
with torch.no_grad():
logits = model(**inputs).logits
preds = torch.softmax(logits, dim=-1)
id2lang = model.config.id2label
vals, idxs = torch.max(preds, dim=1)
#retorna el idioma con mayor porcentaje
maximo=vals.max()
idioma=''
porcentaje=0
for k, v in zip(idxs, vals):
if v.item()==maximo:
idioma,porcentaje=id2lang[k.item()],v.item()
if idioma=='es':
self.idioma="es"
self.modelo_ner='BSC-LT/roberta_model_for_anonimization'
self.faker_ = Faker('es_MX')
self.model = RobertaForTokenClassification.from_pretrained(self.modelo_ner)
else:
self.idioma="en"
self.faker_ = Faker('en_US')
self.modelo_ner="FacebookAI/xlm-roberta-large-finetuned-conll03-english"
self.model = AutoModelForTokenClassification.from_pretrained(self.modelo_ner)
self.categorizar_texto(self.texto)
def reordenacion_tokens(self,tokens):
i=0
new_tokens=[]
ig_tokens=[] #ignorar estos indices del array de indentificadores
for token in tokens:
ind=len(new_tokens)
if i<len(tokens):
if token.startswith("▁"):
new_tokens.append(token)
i=i+1
else:
new_tokens[ind-1] = (new_tokens[ind-1] + token)
ig_tokens.append(i)
i=i+1
return (
new_tokens,
ig_tokens
)
def reordenacion_identificadores(self,ig_tokens,predicted_tokens_classes):
x=0
new_identificadores=[]
for token in predicted_tokens_classes:
if x not in ig_tokens:
new_identificadores.append(token)
x=x+1
else:
x=x+1
return new_identificadores
def salida_json(self,tokens,pre_tokens):
list=[]
i=0
for t in tokens:
if pre_tokens[i]!='O':
a = out_json(t.replace('▁','').replace('Δ ',''),pre_tokens[i].replace('▁',''))
list.append(a)
i=i+1
return MyEncoder().encode(list)
def salida_texto( self,tokens,pre_tokens):
new_labels = []
current_word = None
i=0
for token in tokens:
if pre_tokens[i]=='O' or 'MISC' in pre_tokens[i]:
new_labels.append(' ' +token.replace('▁',''))
else:
new_labels.append(' ' + pre_tokens[i])
i=i+1
a=''
for i in new_labels:
a = a+i
return a
#return new_labels
def salida_texto_anonimizado(self, ids,pre_tokens):
new_labels = []
current_word = None
i=0
for identificador in pre_tokens:
if identificador=='O' or 'OTH' in identificador:
new_labels.append(self.tokenizer.decode(ids[i]))
else:
new_labels.append(' ' + identificador)
i=i+1
a=''
for i in new_labels:
a = a+i
return a
def formato_salida(self,out):
a=""
for i in out:
a = a + i.replace('▁','').replace(' ','') + ' '
return a
def fake_pers(self):
return self.faker_.name(self)
def fake_word(self):
return self.faker_.word()
def fake_first_name(self):
return self.faker_.first_name()
def fake_last_name(self):
return self.faker_.last_name()
def fake_address(self):
return self.faker_.address()
def fake_sentence(self,n):
return self.faker_.sentence(nb_words=n)
def fake_text(self):
return self.faker_.text()
def fake_company(self):
return self.faker_.company()
def fake_city(self):
return self.faker_.city()
def reemplazo_fake(self,identificadores):
new_iden=[]
for id in identificadores:
if 'PER' in id:
new_iden.append(self.fake_first_name())
elif 'ORG' in id:
new_iden.append(self.fake_company())
elif 'LOC' in id:
new_iden.append(self.fake_city())
else:
new_iden.append(id)
return new_iden
def categorizar_texto(self,texto):
name="elozano/bert-base-cased-news-category"
tokenizer = AutoTokenizer.from_pretrained(name)
model_ = AutoModelForSequenceClassification.from_pretrained(name)
inputs_ = tokenizer(texto, padding=True, truncation=True, return_tensors="pt")
with torch.no_grad():
logits = model_(**inputs_).logits
preds = torch.softmax(logits, dim=-1)
id2lang = model_.config.id2label
vals, idxs = torch.max(preds, dim=1)
#retorna el idioma con mayor porcentaje
maximo=vals.max()
cat=''
self.categoria_texto=''
porcentaje=0
for k, v in zip(idxs, vals):
if v.item()==maximo:
cat,porcentaje=id2lang[k.item()],v.item()
self.categoria_texto=cat
return cat, porcentaje
def predict(self,etiquetas):
categoria, porcentaje = self.categorizar_texto(self.texto)
print(categoria, porcentaje)
self.tokenizer = AutoTokenizer.from_pretrained(self.modelo_ner)
tokens = self.tokenizer.tokenize(self.texto)
ids = self.tokenizer.convert_tokens_to_ids(tokens)
input_ids = torch.tensor([ids])
with torch.no_grad():
logits = self.model(input_ids).logits
predicted_token_class_ids = logits.argmax(-1)
predicted_tokens_classes = [self.model.config.id2label[t.item()] for t in predicted_token_class_ids[0]]
labels = predicted_token_class_ids
loss = self.model(input_ids, labels=labels).loss
if (self.idioma=='es'):
out1 = self.salida_json(tokens,predicted_tokens_classes) #spanish solo palabras sensibles
if etiquetas:
out2 = self.salida_texto_anonimizado(ids,predicted_tokens_classes) #solo identificadores
else:
out2 = self.salida_texto_anonimizado(ids,self.reemplazo_fake(predicted_tokens_classes)) #espaΓ±ol texto completo
else:
new_tokens,ig_tokens=self.reordenacion_tokens(tokens)
new_identificadores = self.reordenacion_identificadores(ig_tokens,predicted_tokens_classes)
out1 = self.salida_json(new_tokens,new_identificadores),
if etiquetas:
out2 = self.salida_texto(new_tokens,new_identificadores)#solo identificadores
else:
out2 = self.salida_texto(new_tokens,self.reemplazo_fake(new_identificadores))
return (
out1,
str(out2)
)
class ModeloDataset:
def __init__(self):
self.texto=""
self.idioma=""
self.modelo_ner=""
self.categoria_texto=""
self.tokenizer = AutoTokenizer.from_pretrained("BSC-LT/roberta_model_for_anonimization")
def reordenacion_tokens(self,tokens):
i=0
new_tokens=[]
ig_tokens=[] #ignorar estos indices del array de indentificadores
for token in tokens:
ind=len(new_tokens)
if i<len(tokens):
if token.startswith("▁"):
new_tokens.append(token)
i=i+1
else:
new_tokens[ind-1] = (new_tokens[ind-1] + token)
ig_tokens.append(i)
i=i+1
return (
new_tokens,
ig_tokens
)
def reordenacion_identificadores(self,ig_tokens,predicted_tokens_classes, tamano):
x=0
new_identificadores=[]
for token in predicted_tokens_classes:
if x not in ig_tokens:
if len(new_identificadores) < tamano:
new_identificadores.append(token)
x=x+1
else:
x=x+1
return new_identificadores
def fake_pers(self):
return self.faker_.name(self)
def fake_word(self):
return self.faker_.word()
def fake_first_name(self):
return self.faker_.first_name()
def fake_last_name(self):
return self.faker_.last_name()
def fake_address(self):
return self.faker_.address()
def fake_sentence(self,n):
return self.faker_.sentence(nb_words=n)
def fake_text(self):
return self.faker_.text()
def fake_company(self):
return self.faker_.company()
def fake_city(self):
return self.faker_.city()
def reemplazo_fake(self,identificadores):
if self.idioma=='es':
self.faker_ = Faker('es_MX')
else:
self.faker_ = Faker('en_US')
new_iden=[]
for id in identificadores:
if 'PER' in id:
new_iden.append(self.fake_first_name())
elif 'ORG' in id:
new_iden.append(self.fake_company())
elif 'LOC' in id:
new_iden.append(self.fake_city())
else:
new_iden.append(id)
return new_iden
def aplicar_modelo(self,_sentences,idioma, etiquetas):
if idioma=="es":
self.tokenizer = AutoTokenizer.from_pretrained("BSC-LT/roberta_model_for_anonimization")
tokenized_text=[self.tokenizer.tokenize(sentence) for sentence in _sentences]
ids = [self.tokenizer.convert_tokens_to_ids(x) for x in tokenized_text]
MAX_LEN=128
ids=pad_sequences(ids,maxlen=MAX_LEN,dtype="long",truncating="post", padding="post")
input_ids = torch.tensor(ids)
self.model = RobertaForTokenClassification.from_pretrained("BSC-LT/roberta_model_for_anonimization")
with torch.no_grad():
logits = self.model(input_ids).logits
predicted_token_class_ids = logits.argmax(-1)
i=0
_predicted_tokens_classes=[]
for a in predicted_token_class_ids:
#_predicted_tokens_classes[i]=[model.config.id2label[t.item()] for t in predicted_token_class_ids[i]]
_predicted_tokens_classes.append([self.model.config.id2label[t.item()] for t in predicted_token_class_ids[i]])
i=i+1
labels = predicted_token_class_ids
loss = self.model(input_ids, labels=labels).loss
return ids,_predicted_tokens_classes
else:
print('idioma:',idioma)
self.tokenizer = AutoTokenizer.from_pretrained("FacebookAI/xlm-roberta-large-finetuned-conll03-english")
tokenized_text=[self.tokenizer.tokenize(sentence) for sentence in _sentences]
ids = [self.tokenizer.convert_tokens_to_ids(x) for x in tokenized_text]
MAX_LEN=128
ids=pad_sequences(ids,maxlen=MAX_LEN,dtype="long",truncating="post", padding="post")
input_ids = torch.tensor(ids)
self.model = AutoModelForTokenClassification.from_pretrained("FacebookAI/xlm-roberta-large-finetuned-conll03-english")
with torch.no_grad():
logits = self.model(input_ids).logits
predicted_token_class_ids = logits.argmax(-1)
i=0
_predicted_tokens_classes=[]
for a in predicted_token_class_ids:
#_predicted_tokens_classes[i]=[model.config.id2label[t.item()] for t in predicted_token_class_ids[i]]
_predicted_tokens_classes.append([self.model.config.id2label[t.item()] for t in predicted_token_class_ids[i]])
i=i+1
labels = predicted_token_class_ids
loss = self.model(input_ids, labels=labels).loss
new_tokens=[]
ig_tok=[]
i=0
new_identificadores=[]
for item in tokenized_text:
aux1, aux2= self.reordenacion_tokens(item)
new_tokens.append(aux1)
ig_tok.append(aux2)
for items in _predicted_tokens_classes:
aux=self.reordenacion_identificadores(ig_tok[i],items,len(new_tokens[i]))
new_identificadores.append(aux)
i=i+1
print('new_identificadores:',new_identificadores, ' ',len(new_identificadores) )
return new_identificadores, new_tokens
#return ids, _predicted_tokens_classes
def salida_texto_es( self,ids,pre_tokens):
new_labels = []
current_word = None
i=0
for identificador in pre_tokens:
if (self.tokenizer.decode(ids[i])!="<s>"):
if identificador=='O':
new_labels.append(self.tokenizer.decode(ids[i]))
else:
new_labels.append(' ' + identificador)
i=i+1
return new_labels
def salida_texto( self,tokens,pre_tokens):
new_labels = []
current_word = None
i=0
for token in tokens:
if pre_tokens[i]=='O' or 'MISC' in pre_tokens[i]:
new_labels.append(' ' +token.replace('▁',''))
else:
new_labels.append(' ' + pre_tokens[i])
i=i+1
a=''
for i in new_labels:
a = a+i
return a
def salida_texto2(self, tokens,labels,etiquetas):
i=0
out=[]
for iden in labels:
if etiquetas:
out.append(self.salida_texto( iden,np.array(tokens[i])))
else:
out.append(self.salida_texto(iden,self.reemplazo_fake(np.array(tokens[i]))))
i=i+1
return out
def salida_texto2_es(self, ids,pre_tokens,etiquetas):
i=0
out=[]
for iden in pre_tokens:
if i<len(ids):
if etiquetas:
out.append(self.salida_texto_es( ids[i],np.array(pre_tokens[i])) )
else:
out.append(self.salida_texto_es( ids[i],self.reemplazo_fake(np.array(pre_tokens[i]))))
i=i+1
return out
def unir_array(self,_out):
i=0
salida=[]
for item in _out:
salida.append("".join(str(x) for x in _out[i]))#rev space
i=i+1
return salida
def unir_columna_valores(self,df,columna):
out = ','.join(df[columna])
return out
class utilJSON:
def __init__(self,archivo):
with open(archivo, encoding='utf-8') as f:
self.data = json.load(f)
def obtener_keys_json(self,data):
out=[]
for key in data:
out.append(key)
return(out)
###
### funcion "flatten_json" tomada de https://levelup.gitconnected.com/a-deep-dive-into-nested-json-to-data-frame-with-python-69bdabb41938
### Renu Khandelwal Jul 23, 2023
def flatten_json(self,y):
try:
out = {}
def flatten(x, name=''):
if type(x) is dict:
for a in x:
flatten(x[a], name + a + '_')
elif type(x) is list:
i = 0
for a in x:
flatten(a, name + str(i) + '_')
i += 1
else:
out[name[:-1]] = x
flatten(y)
return out
except json.JSONDecodeError:
print("Error: The JSON document could not be decoded.")
except TypeError:
print("Error: Invalid operation or function argument type.")
except KeyError:
print("Error: One or more keys do not exist.")
except ValueError:
print("Error: Invalid value detected.")
except Exception as e:
# Catch any other exceptions
print(f"An unexpected error occurred: {str(e)}")
def obtener_dataframe(self,data):
claves=self.obtener_keys_json(data)
print(claves)
if len(claves)==1:
#Flatten nested dictionaries and lists
data_flattened = [self.flatten_json(class_info) for class_info in data[claves[0]]]
# Create DataFrame from flattened JSON
df = pd.DataFrame(data_flattened)
else:
#df = pd.json_normalize(data)
data_flattened = [self.flatten_json(class_info) for class_info in data]
df = pd.DataFrame(data_flattened)
return df
modelo = ModeloDataset()
model = Model()
def get_model():
return model
def procesar(texto,archivo, etiquetas):
if len(texto)>0:
print('text')
model.identificacion_idioma(texto)
return model.predict(etiquetas),gr.Dataframe(),gr.File()
else:
if archivo.name.split(".")[1]=="csv":
print('csv')
df=pd.read_csv(archivo.name,delimiter=",")
df_new = pd.DataFrame( columns=df.columns.values)
for item in df.columns.values:
sentences=df[item]
model.identificacion_idioma(sentences[0])
modelo.idioma=model.idioma
ides, predicted = modelo.aplicar_modelo(sentences,model.idioma,etiquetas)
if model.idioma=="es":
out=modelo.salida_texto2_es( ides,predicted,etiquetas)
print('out es:',out)
df_new[item] = modelo.unir_array(out)
else:
out=modelo.salida_texto2( ides,predicted,etiquetas)#tokens,labels
print('out en:',out)
df_new[item] = modelo.unir_array(out)
return "", df_new, df_new.to_csv(sep='\t', encoding='utf-8',index=False)
#return "", df_new, df_new.to_excel( index=False)
else:
print('json')
if archivo.name.split(".")[1]=="json":
util = utilJSON(archivo.name)
df=util.obtener_dataframe(util.data)
df_new = pd.DataFrame( columns=df.columns.values)
for item in df.columns.values:
sentences=df[item]
ides, predicted = modelo.aplicar_modelo(sentences,"en",etiquetas)
out=modelo.salida_texto2( ides,predicted)#tokens,labels
print('out:',out)
df_new[item] = modelo.unir_array(out)
#out=modelo.salida_texto2( ides,predicted)
#df_new[item] = modelo.unir_array(out)
#return "", df, df.to_csv(sep='\t', encoding='utf-8',index=False)
return "", df_new, df_new.to_csv(sep='\t', encoding='utf-8',index=False)
demo = gr.Interface(fn=procesar,inputs=["text",gr.File(), "checkbox"] , outputs=["text",gr.Dataframe(interactive=False),"text"])
#
demo.launch(share=True)