Spaces:
Runtime error
Runtime error
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM | |
import pickle | |
import torch | |
from transformers import PegasusTokenizer, PegasusForConditionalGeneration | |
import tensorflow as tf | |
from tensorflow.python.lib.io import file_io | |
from nltk.tokenize import sent_tokenize | |
import io | |
#contents = pickle.load(f) becomes... | |
#contents = CPU_Unpickler(f).load() | |
model_path = "finbert.sav" | |
#load model from drive | |
with open(model_path, "rb") as f: | |
model1= pickle.load(f) | |
tf.compat.v1.disable_eager_execution() | |
# Let's load the model and the tokenizer | |
model_name = "human-centered-summarization/financial-summarization-pegasus" | |
tokenizer = PegasusTokenizer.from_pretrained(model_name) | |
model2 = PegasusForConditionalGeneration.from_pretrained(model_name) | |
#tokenizer = AutoTokenizer.from_pretrained(checkpoint) | |
#model = AutoModelForSeq2SeqLM.from_pretrained(checkpoint) | |
import nltk | |
from finbert_embedding.embedding import FinbertEmbedding | |
import pandas as pd | |
from nltk.cluster import KMeansClusterer | |
import numpy as np | |
import os | |
from scipy.spatial import distance_matrix | |
from tensorflow.python.lib.io import file_io | |
import pickle | |
nltk.download('punkt') | |
def finbert(word): | |
# Instantiate path to store each text Datafile in dataframe | |
data_path = "/tmp/" | |
if not os.path.exists(data_path): | |
os.makedirs(data_path) | |
input_ = "/tmp/input.txt" | |
# Write file to disk so we can convert each datapoint to a txt file | |
with open(input_, "w") as file: | |
file.write(word) | |
# read the written txt into a variable to start clustering | |
with open(input_ , 'r') as f: | |
text = f.read() | |
# Create tokens from the txt file | |
tokens = nltk.sent_tokenize(text) | |
# Strip out trailing and leading white spaces from tokens | |
sentences = [word.strip() for word in tokens] | |
#Create a DataFrame from the tokens | |
data = pd.DataFrame(sentences) | |
# Assign name Sentences to the column containing text tokens | |
data.columns = ['Sentences'] | |
# Function to create numerical embeddings for each text tokens in dataframe | |
def get_sentence_embeddings(): | |
# Create empty list for sentence embeddings | |
sentence_list = [] | |
# Loop through all sentences and append sentence embeddings to list | |
for i in tokens: | |
sentence_embedding = model1.sentence_vector(i) | |
sentence_list.append(sentence_embedding) | |
# Create empty list for ndarray | |
sentence_array=[] | |
# Loop through sentence list and change data type from tensor to array | |
for i in sentence_list: | |
sentence_array.append(i.numpy()) | |
# return sentence embeddings as list | |
return sentence_array | |
# Apply get_sentence_embeddings to dataframe to create column Embeddings | |
data['Embeddings'] = get_sentence_embeddings() | |
#Number of expected sentences | |
NUM_CLUSTERS = 10 | |
iterations = 8 | |
# Convert Embeddings into an array and store in variable X | |
X = np.array(data['Embeddings'].to_list()) | |
#Build k-means cluster algorithm | |
Kclusterer = KMeansClusterer( | |
NUM_CLUSTERS, | |
distance = nltk.cluster.util.cosine_distance, | |
repeats = iterations, avoid_empty_clusters = True) | |
# if length of text is too short, K means would return an error | |
# use the try except block to return the text as result if it is too short. | |
try: | |
assigned_clusters = Kclusterer.cluster(X,assign_clusters=True) | |
# Apply Kmean Cluster to DataFrame and create new columns Clusters and Centroid | |
data['Cluster'] = pd.Series(assigned_clusters, index = data.index) | |
data['Centroid'] = data['Cluster'].apply(lambda x: Kclusterer.means()[x]) | |
# return the text if clustering algorithm catches an exceptiona and move to the next text file | |
except ValueError: | |
return text | |
# function that computes the distance of each embeddings from the centroid of the cluster | |
def distance_from_centroid(row): | |
return distance_matrix([row['Embeddings']], [row['Centroid'].tolist()])[0][0] | |
# apply distance_from_centroid function to data | |
data['Distance_From_Centroid'] = data.apply(distance_from_centroid, axis =1) | |
## Return Final Summary | |
summary = " ".join(data.sort_values( | |
'Distance_From_Centroid', | |
ascending = True).groupby('Cluster').head(1).sort_index()['Sentences'].tolist()) | |
import re | |
words = list() | |
for text in summary.split(): | |
text = re.sub(r'\n','',text) | |
text = re.sub(r'\s$','',text) | |
words.append(text) | |
summary = " ".join(words) | |
return (summary," Length of Input:---->"+str(len(word))," Length of Output:----> "+str(len(summary))) | |
def pegasus(text): | |
'''A function to obtain summaries for each tokenized sentence. | |
It returns a summarized document as output''' | |
import nltk | |
nltk.download('punkt') | |
import os | |
data_path = "/tmp/" | |
if not os.path.exists(data_path): | |
os.makedirs(data_path) | |
input_ = "/tmp/input.txt" | |
with open(input_, "w") as file: | |
file.write(text) | |
# read the written txt into a variable | |
with open(input_ , 'r') as f: | |
text_ = f.read() | |
def tokenized_sentences(file): | |
'''A function to generate chunks of sentences and texts. | |
Returns tokenized texts''' | |
# Create empty arrays | |
tokenized_sentences = [] | |
sentences = [] | |
length = 0 | |
for sentence in sent_tokenize(file): | |
length += len(sentence) | |
# 512 is the maximum input length for the Pegasus model | |
if length < 512: | |
sentences.append(sentence) | |
else: | |
tokenized_sentences.append(sentences) | |
sentences = [sentence] | |
length = len(sentence) | |
sentences = [sentence.strip() for sentence in sentences] | |
# Append all tokenized sentences | |
if sentences: | |
tokenized_sentences.append(sentences) | |
return tokenized_sentences | |
tokenized = tokenized_sentences(text_) | |
# Use GPU if available | |
device = 'cuda' if torch.cuda.is_available() else 'cpu' | |
global summary | |
# Create an empty array for all summaries | |
summary = [] | |
# Loop to encode tokens, to generate abstractive summary and finally decode tokens | |
for token in tokenized: | |
# Encoding | |
inputs = tokenizer.encode(' '.join(token), truncation=True, return_tensors='pt') | |
# Use CPU or GPU | |
inputs = inputs.to(device) | |
# Get summaries from transformer model | |
all_summary = model2.to(device).generate(inputs,do_sample=True, | |
max_length=50, top_k=50, top_p=0.95, | |
num_beams = 5, early_stopping=True) | |
# num_return_sequences=5) | |
# length_penalty=0.2, no_repeat_ngram_size=2 | |
# min_length=10, | |
# max_length=50) | |
# Decoding | |
output = [tokenizer.decode(each_summary, skip_special_tokens=True, clean_up_tokenization_spaces=False) for each_summary in all_summary] | |
# Append each output to array | |
summary.append(output) | |
# Get final summary | |
summary = [sentence for each in summary for sentence in each] | |
final = "".join(summary) | |
return final | |
import gradio as gr | |
interface1 = gr.Interface(fn=finbert, | |
inputs =gr.inputs.Textbox(lines=15,placeholder="Enter your text !!",label='Input-10k Sections'), | |
outputs=gr.outputs.Textbox(label='Output')).launch() | |