Ptato's picture
fix_docs
0e2b62e
# Import stuff
import streamlit as st
import time
from transformers import pipeline
from transformers import AutoModelForSequenceClassification, AutoTokenizer
import os
import torch
import numpy as np
import pandas as pd
# Mitigates an error on Macs
os.environ['KMP_DUPLICATE_LIB_OK'] = "True"
# Set the titel
st.title("Sentiment Analysis App")
# Set the variables that should not be changed between refreshes of the app.
# logs is a map that records the results of past sentiment analysis queries.
# Type: dict() {"key" --> value[]}
# key: model_name (string) - The name of the model being used
# value: log[] (list) - The list of values that represent the model's results
# --> For the pretrained labels, len(log) = 4
# --> log[0] (int) - The prediction of the model on its input
# --> 0 = Positive
# --> 1 = Negative
# --> 2 = Neutral (if applicable)
# --> log[1] (string) - The tweet/inputted string
# --> log[2] (string) - The judgement of the tweet/input (Positive/Neutral/Negative)
# --> log[3] (string) - The score of the prediction (includes '%' sign)
# --> For the finetuned model, len(log) = 6
# --> log[0] (int) - The prediction of the model on the toxicity of the input
# --> 0 = Nontoxic
# --> 1 = Toxic
# --> log[1] (string) - The tweet/inputted string
# --> log[2] (string) - The highest scoring overall category of toxicity out of:
# 'toxic', 'severe_toxic', 'obscene', 'threat', 'insult', and 'identity_hate'
# --> log[3] (string) - The score of log[2] (includes '%' sign)
# --> log[4] (string) - The predicted type of toxicity, the highest scoring category of toxicity out of:
# 'obscene', 'threat', 'insult', and 'identity_hate'
# --> log[5] (string) - The score of log[4] (includes '%' sign)
if 'logs' not in st.session_state:
st.session_state.logs = dict()
# labels is a list of toxicity categories for the finetuned model
if 'labels' not in st.session_state:
st.session_state.labels = ['toxic', 'severe_toxic', 'obscene', 'threat', 'insult', 'identity_hate']
# filled is a boolean that checks whether logs is prepopulated with data.
if 'filled' not in st.session_state:
st.session_state.filled = False
# model is the finetuned model that I created. It wasn't working well locally on HuggingFace so I uploaded it to HuggingFace as
# a pretrained model. I also set it to evaluation mode.
if 'model' not in st.session_state:
st.session_state.model = AutoModelForSequenceClassification.from_pretrained("Ptato/Modified-Bert-Toxicity-Classification")
st.session_state.model.eval()
# tokenizer is the same tokenizer that is used by the "bert-base-uncased" model, which my finetuned model is built off of.
# tokenizer is used to input the tweets into my model for prediction.
if 'tokenizer' not in st.session_state:
st.session_state.tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
# This form allows users to select their preferred model for training
form = st.form(key='Sentiment Analysis')
# st.session_state.options pre-sets the available model choices.
st.session_state.options = [
'bertweet-base-sentiment-analysis',
'distilbert-base-uncased-finetuned-sst-2-english',
'twitter-roberta-base-sentiment',
'Modified Bert Toxicity Classification'
]
# box is the dropdown box that users use to select their choice of model
box = form.selectbox('Select Pre-trained Model:', st.session_state.options, key=1)
# tweet refers to the text box for users to input their tweets.
# Has a default value of "\"We've seen in the last few months, unprecedented amounts of Voter Fraud.\" @SenTedCruz True!"
# (Tweeted by former president Donald Trump)
tweet = form.text_input(label='Enter text to analyze:', value="\"We've seen in the last few months, unprecedented amounts of Voter Fraud.\" @SenTedCruz True!")
# Submit button
submit = form.form_submit_button(label='Submit')
# Read in some test data for prepopulation
if 'df' not in st.session_state:
st.session_state.df = pd.read_csv("test.csv")
# Initializes logs if not already initialized
if not st.session_state.filled:
# Iterates through all the options, initializing the logs for each.
for s in st.session_state.options:
st.session_state.logs[s] = []
# Pre-populates logs if not already pre-populated
if not st.session_state.filled:
# Esnure pre-population happen again
st.session_state.filled = True
# Initialize 10 entries
for x in range(10):
# Helps me see which entry is being evaluated on the backend
print(x)
# Shorten tweets, as some models may not handle longer ones
text = st.session_state.df["comment_text"].iloc[x][:128]
# Iterate thru the models
for s in st.session_state.options:
# Reset everything
# pline is the pipeline, which is used to load in the proper HuggingFace model for analysis
pline = None
# predictions refer to the predictions made by each model
predictions = None
# encoding is used by the finetuned model as input
encoding = None
# logits and probs are used to transform the results from predictions into usable/outputable data
logits = None
probs = None
# Perform different actions based on the model selected by the user
if s == 'bertweet-base-sentiment-analysis':
pline = pipeline(task="sentiment-analysis", model="finiteautomata/bertweet-base-sentiment-analysis")
elif s == 'twitter-roberta-base-sentiment':
pline = pipeline(task="sentiment-analysis", model="cardiffnlp/twitter-roberta-base-sentiment")
elif s == 'distilbert-base-uncased-finetuned-sst-2-english':
pline = pipeline(task="sentiment-analysis", model="distilbert-base-uncased-finetuned-sst-2-english")
else:
# encode data
encoding = st.session_state.tokenizer(text, return_tensors="pt")
encoding = {k: v.to(st.session_state.model.device) for k, v in encoding.items()}
# feed data into model and store the predictions
predictions = st.session_state.model(**encoding)
# modify the data to get probabilities for each toxicity (scale of 0 - 1)
logits = predictions.logits
sigmoid = torch.nn.Sigmoid()
probs = sigmoid(logits.squeeze().cpu())
# Reform the predictions to note where probabilities are actually high
predictions = np.zeros(probs.shape)
predictions[np.where(probs >= 0.5)] = 1
# Prepare the log entry
log = []
# If there was a pipeline, then we used a pretrained model.
if pline:
# Get the prediction
predictions = pline(text)
# Initialize the log to the proper shape
log = [0] * 4
# Record the text
log[1] = text
# predictions ends up being length 1, so this only happens for the prediction with the highest probability (the returned value)
for p in predictions:
# Different models have different outputs, so we standardize them in the logs
# Note, some unecessary repetions may occur here
if s == 'bertweet-base-sentiment-analysis':
if p['label'] == "POS":
log[0] = 0
log[2] = "POS"
log[3] = f"{ round(p['score'] * 100, 1)}%"
elif p['label'] == "NEU":
log[0] = 2
log[2] = f"{ p['label'] }"
log[3] = f"{round(p['score'] * 100, 1)}%"
else:
log[2] = "NEG"
log[0] = 1
log[3] = f"{round(p['score'] * 100, 1)}%"
elif s == 'distilbert-base-uncased-finetuned-sst-2-english':
if p['label'] == "POSITIVE":
log[0] = 0
log[2] = "POSITIVE"
log[3] = (f"{round(p['score'] * 100, 1)}%")
else:
log[2] = ("NEGATIVE")
log[0] = 1
log[3] = (f"{round(p['score'] * 100, 1)}%")
elif s == 'twitter-roberta-base-sentiment':
if p['label'] == "LABEL_2":
log[0] = 0
log[2] = ("POSITIVE")
log[3] = (f"{round(p['score'] * 100, 1)}%")
elif p['label'] == "LABEL_0":
log[0] = 1
log[2] = ("NEGATIVE")
log[3] = f"{round(p['score'] * 100, 1)}%"
else:
log[0] = 2
log[2] = "NEUTRAL"
log[3] = f"{round(p['score'] * 100, 1)}%"
# Otherwise, we are using the finetuned model
else:
#Initialize log to the proper shape and store the text
log = [0] * 6
log[1] = text
# Determine whether or not there was toxicity
if max(predictions) == 0:
# No toxicity, input log values as such
log[0] = 0
log[2] = ("NO TOXICITY")
log[3] = (f"{100 - round(probs[0].item() * 100, 1)}%")
log[4] = ("N/A")
log[5] = ("N/A")
# There was toxicity
else:
# Record the toxicity
log[0] = 1
# Find the maximum overall toxic category and the maximum toxic category of each type
_max = 0
_max2 = 2
for i in range(1, len(predictions)):
if probs[i].item() > probs[_max].item():
_max = i
if i > 2 and probs[i].item() > probs[_max2].item():
_max2 = i
# Input data into log
log[2] = (st.session_state.labels[_max])
log[3] = (f"{round(probs[_max].item() * 100, 1)}%")
log[4] = (st.session_state.labels[_max2])
log[5] = (f"{round(probs[_max2].item() * 100, 1)}%")
# Add the log to the proper model's logs
st.session_state.logs[s].append(log)
# Check if there was a submitted input
if submit and tweet:
# Small loading message :)
with st.spinner('Analyzing...'):
time.sleep(1)
# Double check that there was an input
if tweet is not None:
# Reset variable
pline = None
# Set up shape for output
# Pretrained models should have 3 columns, while the finetuned model should have 5
if box != 'Modified Bert Toxicity Classification':
col1, col2, col3 = st.columns(3)
else:
col1, col2, col3, col4, col5 = st.columns(5)
# Perform different actions based on the model selected by the user
if box == 'bertweet-base-sentiment-analysis':
pline = pipeline(task="sentiment-analysis", model="finiteautomata/bertweet-base-sentiment-analysis")
elif box == 'twitter-roberta-base-sentiment':
pline = pipeline(task="sentiment-analysis", model="cardiffnlp/twitter-roberta-base-sentiment")
elif box == 'distilbert-base-uncased-finetuned-sst-2-english':
pline = pipeline(task="sentiment-analysis", model="distilbert-base-uncased-finetuned-sst-2-english")
else:
# encode data
encoding = st.session_state.tokenizer(tweet, return_tensors="pt")
encoding = {k: v.to(st.session_state.model.device) for k,v in encoding.items()}
# feed data into model and store the predictions
predictions = st.session_state.model(**encoding)
# modify the data to get probabilities for each toxicity (scale of 0 - 1)
logits = predictions.logits
sigmoid = torch.nn.Sigmoid()
probs = sigmoid(logits.squeeze().cpu())
# Reform the predictions to note where probabilities are actually high
predictions = np.zeros(probs.shape)
predictions[np.where(probs >= 0.5)] = 1
# Title columns differently for different models
# The existence of pline implies that a pretrained model was used
if pline:
# Predict the tweet here
predictions = pline(tweet)
# Title the column
col2.header("Judgement")
else:
# Titling columns
col2.header("Category")
col4.header("Type")
col5.header("Score")
# Title more columns
col1.header("Tweet")
col3.header("Score")
# If we used a pretrained model, process the prediction below
if pline:
# Set log to correct shape
log = [0] * 4
# Store the tweet
log[1] = tweet
# predictions ends up being length 1, so this only happens for the prediction with the highest probability (the returned value)
for p in predictions:
# Different models have different outputs, so we standardize them in the logs
# Note, some unecessary repetions may occur here
if box == 'bertweet-base-sentiment-analysis':
if p['label'] == "POS":
# Only print the first 20 characters of the first line, so that the table lines up
# Also store the proper values into log while printing the outcome of this tweet
col1.success(tweet.split("\n")[0][:20])
log[0] = 0
col2.success("POS")
col3.success(f"{ round(p['score'] * 100, 1)}%")
log[2] = ("POS")
log[3] = (f"{ round(p['score'] * 100, 1)}%")
elif p['label'] == "NEU":
col1.warning(tweet.split("\n")[0][:20])
log[0] = 2
col2.warning(f"{ p['label'] }")
col3.warning(f"{round(p['score'] * 100, 1)}%")
log[2] = ("NEU")
log[3] = (f"{round(p['score'] * 100, 1)}%")
else:
log[0] = 1
col1.error(tweet.split("\n")[0][:20])
col2.error("NEG")
col3.error(f"{round(p['score'] * 100, 1)}%")
log[2] = ("NEG")
log[3] = (f"{round(p['score'] * 100, 1)}%")
elif box == 'distilbert-base-uncased-finetuned-sst-2-english':
if p['label'] == "POSITIVE":
col1.success(tweet.split("\n")[0][:20])
log[0] = 0
col2.success("POSITIVE")
log[2] = "POSITIVE"
col3.success(f"{round(p['score'] * 100, 1)}%")
log[3] = f"{round(p['score'] * 100, 1)}%"
else:
col2.error("NEGATIVE")
col1.error(tweet.split("\n")[0][:20])
log[2] = ("NEGATIVE")
log[0] = 1
col3.error(f"{round(p['score'] * 100, 1)}%")
log[3] = f"{round(p['score'] * 100, 1)}%"
elif box == 'twitter-roberta-base-sentiment':
if p['label'] == "LABEL_2":
log[0] = 0
col1.success(tweet.split("\n")[0][:20])
col2.success("POSITIVE")
col3.success(f"{round(p['score'] * 100, 1)}%")
log[3] = f"{round(p['score'] * 100, 1)}%"
log[2] = "POSITIVE"
elif p['label'] == "LABEL_0":
log[0] = 1
col1.error(tweet.split("\n")[0][:20])
col2.error("NEGATIVE")
col3.error(f"{round(p['score'] * 100, 1)}%")
log[3] = f"{round(p['score'] * 100, 1)}%"
log[2] = "NEGATIVE"
else:
log[0] = 2
col1.warning(tweet.split("\n")[0][:20])
col2.warning("NEUTRAL")
col3.warning(f"{round(p['score'] * 100, 1)}%")
log[3] = f"{round(p['score'] * 100, 1)}%"
log[2] = "NEUTRAL"
# Print out the past inputs in reverse order
for a in st.session_state.logs[box][::-1]:
if a[0] == 0:
# Again, only limit the tweet printed to 20 characters to have everything line up
col1.success(a[1].split("\n")[0][:20])
col2.success(a[2])
col3.success(a[3])
elif a[0] == 1:
col1.error(a[1].split("\n")[0][:20])
col2.error(a[2])
col3.error(a[3])
else:
col1.warning(a[1].split("\n")[0][:20])
col2.warning(a[2])
col3.warning(a[3])
# Add the log to the logs
st.session_state.logs[box].append(log)
# We used the finetuned model, so proceed below
else:
# Initialize log to the proper shape and store the tweet
log = [0] * 6
log[1] = tweet
# Check if nontoxic
if max(predictions) == 0:
# Only display the first 10 characters, as more columns means less characters can fit (make everything line up)
# Display and input the data as we go
col1.success(tweet.split("\n")[0][:10])
col2.success("NO TOXICITY")
col3.success(f"{100 - round(probs[0].item() * 100, 1)}%")
col4.success("N/A")
col5.success("N/A")
log[0] = 0
log[2] = "NO TOXICITY"
log[3] = (f"{100 - round(probs[0].item() * 100, 1)}%")
log[4] = ("N/A")
log[5] = ("N/A")
else:
# Look for the maximum toxicity category and the highest toxicity type
_max = 0
_max2 = 2
for i in range(1, len(predictions)):
if probs[i].item() > probs[_max].item():
_max = i
if i > 2 and probs[i].item() > probs[_max2].item():
_max2 = i
# Display and input the data as we go
col1.error(tweet.split("\n")[0][:10])
col2.error(st.session_state.labels[_max])
col3.error(f"{round(probs[_max].item() * 100, 1)}%")
col4.error(st.session_state.labels[_max2])
col5.error(f"{round(probs[_max2].item() * 100, 1)}%")
log[0] = 1
log[2] = (st.session_state.labels[_max])
log[3] = (f"{round(probs[_max].item() * 100, 1)}%")
log[4] = (st.session_state.labels[_max2])
log[5] = (f"{round(probs[_max2].item() * 100, 1)}%")
# Print out the past logs in reverse order
for a in st.session_state.logs[box][::-1]:
if a[0] == 0:
col1.success(a[1].split("\n")[0][:10])
col2.success(a[2])
col3.success(a[3])
col4.success(a[4])
col5.success(a[5])
elif a[0] == 1:
col1.error(a[1].split("\n")[0][:10])
col2.error(a[2])
col3.error(a[3])
col4.error(a[4])
col5.error(a[5])
else:
col1.warning(a[1].split("\n")[0][:10])
col2.warning(a[2])
col3.warning(a[3])
col4.warning(a[4])
col5.warning(a[5])
# Add result to logs
st.session_state.logs[box].append(log)