Spaces:
Running
Running
import os | |
#os.environ["TOKENIZERS_PARALLELISM"] = "true" | |
#os.environ["HF_HOME"] = "/mnt/c/..." | |
#os.environ["CUDA_PATH"] = "/mnt/c/..." | |
print(os.environ["HF_HOME"]) | |
import gradio as gr | |
from datetime import datetime | |
import pandas as pd | |
import numpy as np | |
from sklearn.cluster import KMeans | |
from sklearn.feature_extraction.text import CountVectorizer | |
from transformers import AutoModel | |
import funcs.anonymiser as anon | |
from torch import cuda, backends, version | |
# Check for torch cuda | |
print("Is CUDA enabled? ", cuda.is_available()) | |
print("Is a CUDA device available on this computer?", backends.cudnn.enabled) | |
if cuda.is_available(): | |
torch_device = "gpu" | |
print("Cuda version installed is: ", version.cuda) | |
low_resource_mode = "No" | |
#os.system("nvidia-smi") | |
else: | |
torch_device = "cpu" | |
low_resource_mode = "Yes" | |
print("Device used is: ", torch_device) | |
#os.environ['CUDA_VISIBLE_DEVICES'] = '-1' | |
from bertopic import BERTopic | |
#from sentence_transformers import SentenceTransformer | |
#from bertopic.backend._hftransformers import HFTransformerBackend | |
#from cuml.manifold import UMAP | |
#umap_model = UMAP(n_components=5, n_neighbors=15, min_dist=0.0) | |
today = datetime.now().strftime("%d%m%Y") | |
today_rev = datetime.now().strftime("%Y%m%d") | |
from funcs.helper_functions import dummy_function, put_columns_in_df, read_file, get_file_path_end | |
from funcs.representation_model import representation_model | |
from funcs.embeddings import make_or_load_embeddings | |
# Load embeddings | |
#embedding_model_name = "BAAI/bge-small-en-v1.5" | |
#embedding_model = SentenceTransformer(embedding_model_name) | |
# Pinning a Jina revision for security purposes: https://www.baseten.co/blog/pinning-ml-model-revisions-for-compatibility-and-security/ | |
# Save Jina model locally as described here: https://huggingface.co/jinaai/jina-embeddings-v2-base-en/discussions/29 | |
embeddings_name = "jinaai/jina-embeddings-v2-small-en" | |
local_embeddings_location = "model/jina/" | |
revision_choice = "b811f03af3d4d7ea72a7c25c802b21fc675a5d99" | |
try: | |
embedding_model = AutoModel.from_pretrained(local_embeddings_location, revision = revision_choice, trust_remote_code=True,local_files_only=True, device_map="auto") | |
except: | |
embedding_model = AutoModel.from_pretrained(embeddings_name, revision = revision_choice, trust_remote_code=True, device_map="auto") | |
def extract_topics(in_files, in_file, min_docs_slider, in_colnames, max_topics_slider, candidate_topics, in_label, anonymise_drop, return_intermediate_files, embeddings_super_compress, low_resource_mode_opt): | |
file_list = [string.name for string in in_file] | |
data_file_names = [string.lower() for string in file_list if "tokenised" not in string and "npz" not in string.lower() and "gz" not in string.lower()] | |
data_file_name = data_file_names[0] | |
data_file_name_no_ext = get_file_path_end(data_file_name) | |
in_colnames_list_first = in_colnames[0] | |
if in_label: | |
in_label_list_first = in_label[0] | |
else: | |
in_label_list_first = in_colnames_list_first | |
if anonymise_drop == "Yes": | |
in_files_anon_col, anonymisation_success = anon.anonymise_script(in_files, in_colnames_list_first, anon_strat="replace") | |
in_files[in_colnames_list_first] = in_files_anon_col[in_colnames_list_first] | |
in_files.to_csv("anonymised_data.csv") | |
docs = list(in_files[in_colnames_list_first].str.lower()) | |
label_col = in_files[in_label_list_first] | |
# Check if embeddings are being loaded in | |
## Load in pre-embedded file if exists | |
file_list = [string.name for string in in_file] | |
embeddings_out, reduced_embeddings = make_or_load_embeddings(docs, file_list, data_file_name_no_ext, embedding_model, return_intermediate_files, embeddings_super_compress, low_resource_mode_opt) | |
# all_lengths = [len(embedding) for embedding in embeddings_out] | |
# if len(set(all_lengths)) > 1: | |
# print("Inconsistent lengths found in embeddings_out:", set(all_lengths)) | |
# else: | |
# print("All lengths are the same.") | |
# print("Embeddings type: ", type(embeddings_out)) | |
# if isinstance(embeddings_out, np.ndarray): | |
# print("my_object is a NumPy ndarray") | |
# else: | |
# print("my_object is not a NumPy ndarray") | |
# Clustering set to K-means (not used) | |
#cluster_model = KMeans(n_clusters=max_topics_slider) | |
# Countvectoriser removes stopwords, combines terms up to 2 together: | |
if min_docs_slider < 3: | |
min_df_val = min_docs_slider | |
else: | |
min_df_val = 3 | |
print(min_df_val) | |
vectoriser_model = CountVectorizer(stop_words="english", ngram_range=(1, 2), min_df=0.1) | |
if not candidate_topics: | |
topic_model = BERTopic( embedding_model=embedding_model, | |
#hdbscan_model=cluster_model, | |
vectorizer_model=vectoriser_model, | |
min_topic_size= min_docs_slider, | |
nr_topics = max_topics_slider, | |
representation_model=representation_model, | |
verbose = True) | |
topics_text, probs = topic_model.fit_transform(docs, embeddings_out) | |
# Do this if you have pre-assigned topics | |
else: | |
zero_shot_topics_list = read_file(candidate_topics.name) | |
zero_shot_topics_list_lower = [x.lower() for x in zero_shot_topics_list] | |
print(zero_shot_topics_list_lower) | |
topic_model = BERTopic( embedding_model=embedding_model, | |
#hdbscan_model=cluster_model, | |
vectorizer_model=vectoriser_model, | |
min_topic_size = min_docs_slider, | |
nr_topics = max_topics_slider, | |
zeroshot_topic_list = zero_shot_topics_list_lower, | |
zeroshot_min_similarity = 0.7, | |
representation_model=representation_model, | |
verbose = True) | |
topics_text, probs = topic_model.fit_transform(docs, embeddings_out) | |
if not topics_text: | |
return "No topics found, original file returned", data_file_name | |
else: | |
topics_text_out = topics_text | |
topics_scores_out = probs | |
topic_det_output_name = "topic_details_" + today_rev + ".csv" | |
topic_dets = topic_model.get_topic_info() | |
topic_dets.to_csv(topic_det_output_name) | |
#print(topic_dets) | |
doc_det_output_name = "doc_details_" + today_rev + ".csv" | |
doc_dets = topic_model.get_document_info(docs)[["Document", "Topic", "Probability", "Name", "Representative_document"]] | |
doc_dets.to_csv(doc_det_output_name) | |
#print(doc_dets) | |
#print(topic_dets) | |
#topics_text_out_str = ', '.join(list(topic_dets["KeyBERT"])) | |
topics_text_out_str = str(topic_dets["KeyBERT"]) | |
#topics_scores_out_str = str(doc_dets["Probability"][0]) | |
output_text = "Topics: " + topics_text_out_str #+ "\n\nProbability scores: " + topics_scores_out_str | |
# Outputs | |
embedding_file_name = data_file_name_no_ext + '_' + 'embeddings.npz' | |
np.savez_compressed(embedding_file_name, embeddings_out) | |
topic_model_save_name = data_file_name_no_ext + "_topics_" + today_rev + ".pkl" | |
topic_model.save(topic_model_save_name, serialization='pickle', save_embedding_model=False, save_ctfidf=False) | |
# Visualise the topics: | |
topics_vis = topic_model.visualize_documents(label_col, reduced_embeddings=reduced_embeddings, hide_annotations=True, hide_document_hover=False, custom_labels=True) | |
return output_text, [doc_det_output_name, topic_det_output_name, embedding_file_name, topic_model_save_name], topics_vis | |
# ## Gradio app - extract topics | |
block = gr.Blocks(theme = gr.themes.Base()) | |
with block: | |
data_state = gr.State(pd.DataFrame()) | |
gr.Markdown( | |
""" | |
# Extract topics from text | |
Enter open text below to get topics. You can copy and paste text directly, or upload a file and specify the column that you want to topics. | |
""") | |
#with gr.Accordion("I will copy and paste my open text", open = False): | |
# in_text = gr.Textbox(label="Copy and paste your open text here", lines = 5) | |
with gr.Tab("Load files and find topics"): | |
with gr.Accordion("Load data file", open = True): | |
in_files = gr.File(label="Input text from file", file_count="multiple") | |
with gr.Row(): | |
in_colnames = gr.Dropdown(choices=["Choose a column"], multiselect = True, label="Select column to find topics (first will be chosen if multiple selected).") | |
in_label = gr.Dropdown(choices=["Choose a column"], multiselect = True, label="Select column to for labelling documents in the output visualisation.") | |
with gr.Accordion("I have my own list of topics. File should have at least one column with a header and topic keywords in cells below. Topics will be taken from the first column of the file", open = False): | |
candidate_topics = gr.File(label="Input topics from file (csv)") | |
with gr.Row(): | |
min_docs_slider = gr.Slider(minimum = 1, maximum = 1000, value = 15, step = 1, label = "Minimum number of documents needed to create topic") | |
max_topics_slider = gr.Slider(minimum = 2, maximum = 500, value = 3, step = 1, label = "Maximum number of topics") | |
with gr.Row(): | |
topics_btn = gr.Button("Extract topics") | |
with gr.Row(): | |
output_single_text = gr.Textbox(label="Output example (first example in dataset)") | |
output_file = gr.File(label="Output file") | |
plot = gr.Plot(label="Visualise your topics here:") | |
with gr.Tab("Load and data processing options"): | |
with gr.Accordion("Process data on load", open = True): | |
anonymise_drop = gr.Dropdown(value = "No", choices=["Yes", "No"], multiselect=False, label="Anonymise data on file load.") | |
return_intermediate_files = gr.Dropdown(label = "Return intermediate processing files from file preparation. Files can be loaded in to save processing time in future.", value="No", choices=["Yes", "No"]) | |
embedding_super_compress = gr.Dropdown(label = "Round embeddings to three dp for smaller files with less accuracy.", value="No", choices=["Yes", "No"]) | |
low_resource_mode_opt = gr.Dropdown(label = "Low resource mode (non-AI embeddings, no LLM-generated topic names).", value=low_resource_mode, choices=["Yes", "No"]) | |
# Update column names dropdown when file uploaded | |
in_files.upload(fn=put_columns_in_df, inputs=[in_files], outputs=[in_colnames, in_label, data_state]) | |
in_colnames.change(dummy_function, in_colnames, None) | |
topics_btn.click(fn=extract_topics, inputs=[data_state, in_files, min_docs_slider, in_colnames, max_topics_slider, candidate_topics, in_label, anonymise_drop, return_intermediate_files, embedding_super_compress, low_resource_mode_opt], outputs=[output_single_text, output_file, plot], api_name="topics") | |
block.queue().launch(debug=True)#, server_name="0.0.0.0", ssl_verify=False, server_port=7860) | |