|
import streamlit as st |
|
import plotly.graph_objects as go |
|
from transformers import pipeline |
|
import re |
|
import time |
|
import requests |
|
from PIL import Image |
|
import itertools |
|
import numpy as np |
|
import matplotlib.pyplot as plt |
|
from matplotlib.colors import rgb2hex |
|
import matplotlib |
|
from matplotlib.colors import ListedColormap, rgb2hex |
|
import ipywidgets as widgets |
|
from IPython.display import display, HTML |
|
import re |
|
import pandas as pd |
|
from pprint import pprint |
|
from tenacity import retry |
|
from tqdm import tqdm |
|
import tiktoken |
|
import scipy.stats |
|
import torch |
|
from transformers import GPT2LMHeadModel |
|
import tiktoken |
|
import seaborn as sns |
|
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM |
|
|
|
import openai |
|
|
|
|
|
import streamlit as st |
|
def colorize_tokens(token_data, sentence): |
|
colored_sentence = "" |
|
start = 0 |
|
|
|
for token in token_data: |
|
entity_group = token["entity_group"] |
|
word = token["word"] |
|
tag = f"[{entity_group}]" |
|
tag_color = tag_colors.get(entity_group, "white") |
|
colored_chunk = f'<span style="color:black;background-color:{tag_color}">{word} {tag}</span>' |
|
colored_sentence += sentence[start:token["start"]] + colored_chunk |
|
start = token["end"] |
|
|
|
|
|
colored_sentence += sentence[start:] |
|
|
|
return colored_sentence |
|
|
|
|
|
tag_colors = { |
|
"ADJP": "#0000FF", |
|
"ADVP": "#008000", |
|
"CONJP": "#FF0000", |
|
"INTJ": "#00FFFF", |
|
"LST": "#FF00FF", |
|
"NP": "#FFFF00", |
|
"PP": "#800080", |
|
"PRT": "#00008B", |
|
"SBAR": "#006400", |
|
"VP": "#008B8B", |
|
} |
|
|
|
|
|
|
|
def generate_tagged_sentence(sentence, entity_tags): |
|
|
|
tagged_tokens = [] |
|
|
|
|
|
for tag in entity_tags: |
|
start = tag['start'] |
|
end = tag['end'] |
|
if end<len(sentence)-1: |
|
token = sentence[start:end] |
|
else: |
|
token = sentence[start:end+1] |
|
tag_name = f"[{tag['entity_group']}]" |
|
|
|
tagged_tokens.append(f"{token} {tag_name}") |
|
|
|
|
|
return " ".join(tagged_tokens) |
|
|
|
|
|
def replace_pp_with_pause(sentence, entity_tags): |
|
|
|
tagged_tokens = [] |
|
|
|
|
|
for tag in entity_tags: |
|
start = tag['start'] |
|
end = tag['end'] |
|
if end < len(sentence) - 1: |
|
token = sentence[start:end] |
|
else: |
|
token = sentence[start:end + 1] |
|
|
|
tag_name = '[PAUSE]' if tag['entity_group'] == 'PP' else '' |
|
tagged_tokens.append(f"{token}{tag_name}") |
|
print(tagged_tokens) |
|
|
|
|
|
modified_words = [] |
|
for i, word in enumerate(tagged_tokens): |
|
if word.startswith("'s"): |
|
modified_words[-1] = modified_words[-1] + word |
|
else: |
|
modified_words.append(word) |
|
|
|
output = " ".join(modified_words) |
|
|
|
return output |
|
|
|
|
|
|
|
def get_split_sentences(sentence, entity_tags): |
|
split_sentences = [] |
|
|
|
|
|
current_sentence = [] |
|
|
|
|
|
for tag in entity_tags: |
|
if tag['entity_group'] == 'PP': |
|
start = tag['start'] |
|
end = tag['end'] |
|
if end<len(sentence)-1: |
|
token = sentence[start:end] |
|
else: |
|
token = sentence[start:end+1] |
|
current_sentence.append(token) |
|
split_sentences.append(" ".join(current_sentence)) |
|
current_sentence = [] |
|
else: |
|
start = tag['start'] |
|
end = tag['end'] |
|
if end<len(sentence)-1: |
|
token = sentence[start:end] |
|
else: |
|
token = sentence[start:end+1] |
|
current_sentence.append(token) |
|
|
|
|
|
if current_sentence: |
|
split_sentences.append("".join(current_sentence)) |
|
|
|
return split_sentences |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.set_page_config(page_title="Hallucination", layout="wide") |
|
st.title(':blue[Sorry come again! This time slowly, please]') |
|
st.header("Rephrasing LLM Prompts for Better Comprehension Reduces :blue[Hallucination]") |
|
|
|
video_file1 = open('machine.mp4', 'rb') |
|
video_file2 = open('Pause 3 Out1.mp4', 'rb') |
|
video_bytes1 = video_file1.read() |
|
video_bytes2 = video_file2.read() |
|
col1a, col1b = st.columns(2) |
|
with col1a: |
|
st.caption("Original") |
|
st.video(video_bytes1) |
|
with col1b: |
|
st.caption("Paraphrased and added [PAUSE]") |
|
st.video(video_bytes2) |
|
|
|
HF_SPACES_API_KEY = st.secrets["HF_token"] |
|
|
|
|
|
API_URL = "https://api-inference.huggingface.co/models/bigscience/bloom" |
|
headers = {"Authorization": HF_SPACES_API_KEY} |
|
|
|
def query(payload): |
|
response = requests.post(API_URL, headers=headers, json=payload) |
|
return response.json() |
|
|
|
API_URL_chunk = "https://api-inference.huggingface.co/models/flair/chunk-english" |
|
|
|
def query_chunk(payload): |
|
response = requests.post(API_URL_chunk, headers=headers, json=payload) |
|
return response.json() |
|
|
|
|
|
|
|
from tenacity import ( |
|
retry, |
|
stop_after_attempt, |
|
wait_random_exponential, |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
prompt = '''Generate a story from the given text. |
|
Text : ''' |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def render_heatmap(original_text, importance_scores_df): |
|
|
|
importance_values = importance_scores_df['importance_value'].values |
|
|
|
|
|
min_val = np.min(importance_values) |
|
max_val = np.max(importance_values) |
|
|
|
if max_val - min_val != 0: |
|
normalized_importance_values = (importance_values - min_val) / (max_val - min_val) |
|
else: |
|
normalized_importance_values = np.zeros_like(importance_values) |
|
|
|
|
|
cmap = matplotlib.cm.get_cmap('Blues') |
|
|
|
|
|
def get_text_color(bg_color): |
|
brightness = 0.299 * bg_color[0] + 0.587 * bg_color[1] + 0.114 * bg_color[2] |
|
if brightness < 0.5: |
|
return 'white' |
|
else: |
|
return 'black' |
|
|
|
|
|
original_pointer = 0 |
|
token_pointer = 0 |
|
|
|
|
|
html = "" |
|
while original_pointer < len(original_text): |
|
token = importance_scores_df.loc[token_pointer, 'token'] |
|
if original_pointer == original_text.find(token, original_pointer): |
|
importance = normalized_importance_values[token_pointer] |
|
rgba = cmap(importance) |
|
bg_color = rgba[:3] |
|
text_color = get_text_color(bg_color) |
|
html += f'<span style="background-color: rgba({int(bg_color[0]*255)}, {int(bg_color[1]*255)}, {int(bg_color[2]*255)}, 1); color: {text_color};">{token}</span>' |
|
original_pointer += len(token) |
|
token_pointer += 1 |
|
else: |
|
html += original_text[original_pointer] |
|
original_pointer += 1 |
|
|
|
st.markdown(html, unsafe_allow_html=True) |
|
|
|
|
|
|
|
|
|
prompt_list=["Which individuals possessed the ships that were part of the Boston Tea Party?", |
|
"Freddie Frith", "Robert used PDF for his math homework." |
|
] |
|
|
|
options = [f"Prompt #{i+1}: {prompt_list[i]}" for i in range(3)] + ["Another Prompt..."] |
|
selection = st.selectbox("Choose a prompt from the dropdown below . Click on :blue['Another Prompt...'] , if you want to enter your own custom prompt.", options=options) |
|
check=[] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if selection == "Another Prompt...": |
|
check = st.text_input("Enter your custom prompt...") |
|
check = " " + check |
|
if check: |
|
st.caption(f""":white_check_mark: Your input prompt is : {check}""") |
|
st.caption(':green[Kindly hold on for a few minutes while the AI text is being generated]') |
|
|
|
|
|
|
|
|
|
else: |
|
check = re.split(r'#\d+:', selection, 1)[1] |
|
if check: |
|
st.caption(f""":white_check_mark: Your input prompt is : {check}""") |
|
st.caption(':green[Kindly hold on for a few minutes while the AI text is being generated]') |
|
|
|
|
|
|
|
def load_chunk_model(check): |
|
iden=['error'] |
|
while 'error' in iden: |
|
time.sleep(1) |
|
try: |
|
output = query_chunk({"inputs": f"""{check}""",}) |
|
iden = output |
|
except Exception as e: |
|
print(f"An exception occurred: {e}") |
|
|
|
return output |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_text_gen_model(check): |
|
iden=['error'] |
|
while 'error' in iden: |
|
time.sleep(1) |
|
try: |
|
output = query({ |
|
"inputs": f"""{check}""", |
|
"parameters": { |
|
"min_new_tokens": 30, |
|
"max_new_tokens": 100, |
|
"do_sample":True, |
|
|
|
|
|
|
|
|
|
|
|
|
|
} |
|
}) |
|
iden = output |
|
except Exception as e: |
|
print(f"An exception occurred: {e}") |
|
|
|
return output[0]['generated_text'] |
|
|
|
|
|
|
|
|
|
|
|
|
|
def decoded_tokens(string, tokenizer): |
|
return [tokenizer.decode([x]) for x in tokenizer.encode(string)] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@st.cache_data |
|
def analyze_heatmap(df_input): |
|
df = df_input.copy() |
|
df["Position"] = range(len(df)) |
|
|
|
|
|
blues = matplotlib.cm.get_cmap("Blues") |
|
|
|
fig, ax = plt.subplots(figsize=(10, 6)) |
|
|
|
|
|
min_val = df["importance_value"].min() |
|
max_val = df["importance_value"].max() |
|
normalized_values = (df["importance_value"] - min_val) / (max_val - min_val) |
|
|
|
|
|
for i, (token, norm_value) in enumerate(zip(df["token"], normalized_values)): |
|
color = blues(norm_value) |
|
ax.bar( |
|
x=[i], |
|
height=[df["importance_value"].iloc[i]], |
|
width=1.0, |
|
color=[color], |
|
) |
|
|
|
|
|
|
|
|
|
|
|
ax.set_xticks(range(len(df["token"]))) |
|
ax.set_xticklabels(df["token"], rotation=45) |
|
|
|
return fig |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def integrated_gradients(input_ids, baseline, model, n_steps= 10): |
|
|
|
input_ids = input_ids.long() |
|
baseline = baseline.long() |
|
|
|
|
|
accumulated_grads = None |
|
|
|
|
|
alphas = torch.linspace(0, 1, n_steps) |
|
delta = input_ids - baseline |
|
interpolates = [(baseline + (alpha * delta).long()).long() for alpha in alphas] |
|
|
|
|
|
pbar = tqdm(total=n_steps, desc="Calculating Integrated Gradients") |
|
|
|
for interpolate in interpolates: |
|
|
|
|
|
pbar.update(1) |
|
|
|
|
|
interpolate_embedding = model.transformer.wte(interpolate).clone().detach().requires_grad_(True) |
|
|
|
|
|
output = model(inputs_embeds=interpolate_embedding, output_attentions=False)[0] |
|
|
|
|
|
aggregated_logit = output.sum() |
|
|
|
|
|
aggregated_logit.backward() |
|
|
|
|
|
if accumulated_grads is None: |
|
accumulated_grads = interpolate_embedding.grad.clone() |
|
else: |
|
accumulated_grads += interpolate_embedding.grad |
|
|
|
|
|
model.zero_grad() |
|
interpolate_embedding.grad.zero_() |
|
|
|
|
|
pbar.close() |
|
|
|
|
|
avg_grads = accumulated_grads / n_steps |
|
|
|
|
|
with torch.no_grad(): |
|
input_embedding = model.transformer.wte(input_ids) |
|
baseline_embedding = model.transformer.wte(baseline) |
|
attributions = (input_embedding - baseline_embedding) * avg_grads |
|
|
|
return attributions |
|
|
|
def process_integrated_gradients(input_text, _gpt2tokenizer, model): |
|
inputs = torch.tensor([_gpt2tokenizer.encode(input_text)]) |
|
|
|
gpt2tokens = decoded_tokens(input_text, _gpt2tokenizer) |
|
|
|
with torch.no_grad(): |
|
outputs = model(inputs, output_attentions=True, output_hidden_states=True) |
|
|
|
attentions = outputs[-1] |
|
|
|
|
|
baseline = torch.zeros_like(inputs).long() |
|
|
|
|
|
attributions = integrated_gradients(inputs, baseline, model) |
|
|
|
|
|
attributions_np = attributions.detach().numpy().sum(axis=2) |
|
|
|
|
|
attributions_sum = attributions.sum(axis=2).squeeze(0).detach().numpy() |
|
|
|
l2_norm_attributions = np.linalg.norm(attributions_sum, 2) |
|
normalized_attributions_sum = attributions_sum / l2_norm_attributions |
|
|
|
clamped_attributions_sum = np.where(normalized_attributions_sum < 0, 0, normalized_attributions_sum) |
|
|
|
attribution_df = pd.DataFrame({ |
|
'token': gpt2tokens, |
|
'importance_value': clamped_attributions_sum |
|
}) |
|
return attribution_df |
|
|
|
model_type = 'gpt2' |
|
model_version = 'gpt2' |
|
model = GPT2LMHeadModel.from_pretrained(model_version, output_attentions=True) |
|
_gpt2tokenizer = tiktoken.get_encoding("gpt2") |
|
|
|
para_tokenizer = AutoTokenizer.from_pretrained("humarin/chatgpt_paraphraser_on_T5_base") |
|
para_model = AutoModelForSeq2SeqLM.from_pretrained("humarin/chatgpt_paraphraser_on_T5_base") |
|
|
|
@st.cache_resource |
|
def paraphrase( |
|
question, |
|
num_beams=5, |
|
num_beam_groups=5, |
|
num_return_sequences=5, |
|
repetition_penalty=10.0, |
|
diversity_penalty=3.0, |
|
no_repeat_ngram_size=2, |
|
temperature=0.7, |
|
max_length=64 |
|
): |
|
input_ids = para_tokenizer( |
|
f'paraphrase: {question}', |
|
return_tensors="pt", padding="longest", |
|
max_length=max_length, |
|
truncation=True, |
|
).input_ids |
|
|
|
outputs = para_model.generate( |
|
input_ids, temperature=temperature, repetition_penalty=repetition_penalty, |
|
num_return_sequences=num_return_sequences, no_repeat_ngram_size=no_repeat_ngram_size, |
|
num_beams=num_beams, num_beam_groups=num_beam_groups, |
|
max_length=max_length, diversity_penalty=diversity_penalty |
|
) |
|
|
|
res = para_tokenizer.batch_decode(outputs, skip_special_tokens=True) |
|
|
|
return res |
|
|
|
|
|
|
|
class SentenceAnalyzer: |
|
def __init__(self, check, original, _gpt2tokenizer, model): |
|
self.check = check |
|
self.original = original |
|
self._gpt2tokenizer = _gpt2tokenizer |
|
self.model = model |
|
self.entity_tags = load_chunk_model(check) |
|
self.tagged_sentence = generate_tagged_sentence(check, self.entity_tags) |
|
self.sentence_with_pause = replace_pp_with_pause(check, self.entity_tags) |
|
self.split_sentences = get_split_sentences(check, self.entity_tags) |
|
self.colored_output = colorize_tokens(self.entity_tags, check) |
|
|
|
def analyze(self): |
|
|
|
|
|
attribution_df1 = process_integrated_gradients(self.check, self._gpt2tokenizer, self.model) |
|
st.caption(f":blue[{self.original}]:") |
|
render_heatmap(self.check, attribution_df1) |
|
|
|
st.pyplot(analyze_heatmap(attribution_df1)) |
|
|
|
|
|
dataframes_list = [] |
|
|
|
for i, split_sentence in enumerate(self.split_sentences): |
|
|
|
attribution_df1 = process_integrated_gradients(split_sentence, self._gpt2tokenizer, self.model) |
|
if i < len(self.split_sentences) - 1: |
|
|
|
pause_row = pd.DataFrame({'token': '[PAUSE]', 'importance_value': 0},index=[len(attribution_df1)]) |
|
|
|
attribution_df1 = pd.concat([attribution_df1,pause_row], ignore_index=True) |
|
|
|
dataframes_list.append(attribution_df1) |
|
|
|
|
|
if dataframes_list: |
|
combined_dataframe = pd.concat(dataframes_list, axis=0) |
|
combined_dataframe = combined_dataframe[combined_dataframe['token'] != " "].reset_index(drop=True) |
|
combined_dataframe1 = combined_dataframe[combined_dataframe['token'] != "[PAUSE]"] |
|
combined_dataframe1.reset_index(drop=True, inplace=True) |
|
st.write(f"Sentence with [PAUSE] Replacement:") |
|
|
|
render_heatmap(self.sentence_with_pause,combined_dataframe1) |
|
|
|
st.pyplot(analyze_heatmap(combined_dataframe)) |
|
|
|
|
|
paraphrase_list=paraphrase(check) |
|
|
|
|
|
|
|
col1, col2 = st.columns(2) |
|
with col1: |
|
analyzer = SentenceAnalyzer(check, "Original Prompt", _gpt2tokenizer, model) |
|
analyzer.analyze() |
|
with col2: |
|
ai_gen_text=load_text_gen_model(check) |
|
st.caption(':blue[AI generated text by GPT4]') |
|
st.write(ai_gen_text) |
|
|
|
|
|
st.markdown("""<hr style="height:5px;border:none;color:lightblue;background-color:lightblue;" /> """, unsafe_allow_html=True) |
|
|
|
|
|
col3, col4 = st.columns(2) |
|
with col3: |
|
analyzer = SentenceAnalyzer(" "+paraphrase_list[0], "Paraphrase 1", _gpt2tokenizer, model) |
|
analyzer.analyze() |
|
with col4: |
|
ai_gen_text=load_text_gen_model(paraphrase_list[0]) |
|
st.caption(':blue[AI generated text by GPT4]') |
|
st.write(ai_gen_text) |
|
|
|
st.markdown("""<hr style="height:5px;border:none;color:lightblue;background-color:skyblue;" /> """, unsafe_allow_html=True) |
|
|
|
col5, col6 = st.columns(2) |
|
with col5: |
|
analyzer = SentenceAnalyzer(" "+paraphrase_list[1], "Paraphrase 2", _gpt2tokenizer, model) |
|
analyzer.analyze() |
|
with col6: |
|
ai_gen_text=load_text_gen_model(paraphrase_list[1]) |
|
st.caption(':blue[AI generated text by GPT4]') |
|
st.write(ai_gen_text) |
|
|
|
st.markdown("""<hr style="height:5px;border:none;color:lightblue;background-color:skyblue;" /> """, unsafe_allow_html=True) |
|
|
|
col7, col8 = st.columns(2) |
|
with col7: |
|
analyzer = SentenceAnalyzer(" "+paraphrase_list[2], "Paraphrase 3", _gpt2tokenizer, model) |
|
analyzer.analyze() |
|
with col8: |
|
ai_gen_text=load_text_gen_model(paraphrase_list[2]) |
|
st.caption(':blue[AI generated text by GPT4]') |
|
st.write(ai_gen_text) |
|
|
|
st.markdown("""<hr style="height:5px;border:none;color:lightblue;background-color:skyblue;" /> """, unsafe_allow_html=True) |
|
|
|
col9, col10 = st.columns(2) |
|
with col9: |
|
analyzer = SentenceAnalyzer(" "+paraphrase_list[3], "Paraphrase 4", _gpt2tokenizer, model) |
|
analyzer.analyze() |
|
with col10: |
|
ai_gen_text=load_text_gen_model(paraphrase_list[3]) |
|
st.caption(':blue[AI generated text by GPT4]') |
|
st.write(ai_gen_text) |
|
|
|
st.markdown("""<hr style="height:5px;border:none;color:lightblue;background-color:skyblue;" /> """, unsafe_allow_html=True) |
|
|
|
col11, col12 = st.columns(2) |
|
with col11: |
|
analyzer = SentenceAnalyzer(" "+paraphrase_list[4], "Paraphrase 5", _gpt2tokenizer, model) |
|
analyzer.analyze() |
|
with col12: |
|
ai_gen_text=load_text_gen_model(paraphrase_list[4]) |
|
st.caption(':blue[AI generated text by GPT4]') |
|
st.write(ai_gen_text) |
|
|
|
|