|
import gradio as gr |
|
import pandas as pd |
|
import re |
|
import spacy |
|
from sklearn.cluster import KMeans |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
from sklearn.feature_extraction.text import TfidfVectorizer |
|
from sentence_transformers import SentenceTransformer, util |
|
from transformers import pipeline, AutoTokenizer |
|
import textstat |
|
|
|
sentiment_analyzer = pipeline("sentiment-analysis", model="distilbert-base-uncased-finetuned-sst-2-english") |
|
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased-finetuned-sst-2-english") |
|
|
|
nlp = spacy.load("en_core_web_sm") |
|
|
|
model = SentenceTransformer('all-MiniLM-L6-v2') |
|
|
|
weights = { |
|
"information_density": 0.2, |
|
"unique_key_points": 0.8, |
|
"strength_word_count": 0.002, |
|
"weakness_word_count": 0.004, |
|
"discussion_word_count": 0.01 |
|
} |
|
|
|
THRESHOLDS = { |
|
"normalized_length": (0.15, 0.25), |
|
"unique_key_points": (3, 10), |
|
"information_density": (0.01, 0.02), |
|
"unique_insights_per_word": 0.002, |
|
"optimization_score": 0.7, |
|
"composite_score": 5, |
|
"adjusted_argument_strength": 0.75 |
|
} |
|
|
|
def chunk_text(text, max_length): |
|
tokens = tokenizer(text, return_tensors="pt", truncation=False)["input_ids"].squeeze(0).tolist() |
|
return [tokenizer.decode(tokens[i:i+max_length]) for i in range(0, len(tokens), max_length)] |
|
|
|
def analyze_text(texts): |
|
results = [] |
|
for text in texts: |
|
chunks = chunk_text(text, max_length=200) |
|
chunk_results = sentiment_analyzer(chunks) |
|
overall_sentiment = { |
|
"label": "POSITIVE" if sum(1 for res in chunk_results if res["label"] == "POSITIVE") >= len(chunk_results) / 2 else "NEGATIVE", |
|
"score": sum(res["score"] for res in chunk_results) / len(chunk_results), |
|
} |
|
results.append(overall_sentiment) |
|
return results |
|
|
|
def word_count(text): |
|
return len(text.split()) if isinstance(text, str) else 0 |
|
|
|
def count_citations(text): |
|
doc = nlp(text) |
|
return sum(1 for ent in doc.ents if ent.label_ in ['WORK_OF_ART', 'ORG', 'GPE']) |
|
|
|
def calculate_unique_insights_per_word(text): |
|
sentences = text.split('.') |
|
tfidf = TfidfVectorizer().fit_transform(sentences) |
|
similarities = cosine_similarity(tfidf) |
|
avg_similarity = (similarities.sum() - len(sentences)) / (len(sentences)**2 - len(sentences)) |
|
return 1 - avg_similarity |
|
|
|
def calculate_unique_key_points_and_density(texts): |
|
unique_key_points = [] |
|
information_density = [] |
|
|
|
for text in texts: |
|
if not isinstance(text, str) or text.strip() == "": |
|
unique_key_points.append(0) |
|
information_density.append(0) |
|
continue |
|
|
|
doc = nlp(text) |
|
sentences = [sent.text for sent in doc.sents] |
|
|
|
embeddings = model.encode(sentences) |
|
|
|
n_clusters = max(1, len(sentences) // 5) |
|
kmeans = KMeans(n_clusters=n_clusters, random_state=42) |
|
kmeans.fit(embeddings) |
|
|
|
cluster_centers = kmeans.cluster_centers_ |
|
unique_points_count = len(cluster_centers) |
|
|
|
word_count = len(text.split()) |
|
density = unique_points_count / word_count if word_count > 0 else 0 |
|
|
|
unique_key_points.append(unique_points_count) |
|
information_density.append(density) |
|
|
|
return unique_key_points, information_density |
|
|
|
def segment_comments(comments): |
|
if comments == "N/A": |
|
return {"strengths": "", "weaknesses": "", "general_discussion": ""} |
|
|
|
strengths = re.search(r"- Strengths:\n([\s\S]*?)(\n- Weaknesses:|\Z)", comments) |
|
weaknesses = re.search(r"- Weaknesses:\n([\s\S]*?)(\n- General Discussion:|\Z)", comments) |
|
general_discussion = re.search(r"- General Discussion:\n([\s\S]*?)\Z", comments) |
|
|
|
return { |
|
"strengths": strengths.group(1).strip() if strengths else "", |
|
"weaknesses": weaknesses.group(1).strip() if weaknesses else "", |
|
"general_discussion": general_discussion.group(1).strip() if general_discussion else "" |
|
} |
|
|
|
def preprocess(comment, abstract): |
|
df = pd.DataFrame({"comments": [comment]}) |
|
abstracts = pd.DataFrame({"abstract": [abstract]}) |
|
|
|
segmented_reviews = df["comments"].apply(segment_comments) |
|
df["strengths"] = segmented_reviews.apply(lambda x: x["strengths"]) |
|
df["weaknesses"] = segmented_reviews.apply(lambda x: x["weaknesses"]) |
|
df["general_discussion"] = segmented_reviews.apply(lambda x: x["general_discussion"]) |
|
|
|
comments_embeddings = model.encode(df['comments'].tolist(), convert_to_tensor=True) |
|
abstract_embeddings = model.encode(abstracts["abstract"].tolist(), convert_to_tensor=True) |
|
df['content_relevance'] = util.cos_sim(comments_embeddings, abstract_embeddings).diagonal() |
|
|
|
df['evidence_support'] = df['comments'].apply(count_citations) |
|
|
|
df['strengths'] = df['strengths'].fillna('').astype(str) |
|
texts = df['strengths'].tolist() |
|
results = analyze_text(texts) |
|
df['strength_argument_score'] = [result['score'] for result in results] |
|
|
|
df['weaknesses'] = df['weaknesses'].fillna('').astype(str) |
|
texts = df['weaknesses'].tolist() |
|
results = analyze_text(texts) |
|
df['weakness_argument_score'] = [result['score'] for result in results] |
|
|
|
df['argument_strength'] = (df['strength_argument_score'] + df['weakness_argument_score']) / 2 |
|
|
|
df['readability_index'] = df['comments'].apply(textstat.flesch_reading_ease) |
|
df['sentence_complexity'] = df['comments'].apply(textstat.sentence_count) |
|
df['technical_depth'] = df['readability_index'] / df['sentence_complexity'] |
|
|
|
df['total_word_count'] = df['comments'].apply(word_count) |
|
df['strength_word_count'] = df['strengths'].apply(word_count) |
|
df['weakness_word_count'] = df['weaknesses'].apply(word_count) |
|
df['discussion_word_count'] = df['general_discussion'].apply(word_count) |
|
|
|
average_length = df['total_word_count'].mean() |
|
df['normalized_length'] = df['total_word_count'] / average_length |
|
df["unique_key_points"], df["information_density"] = calculate_unique_key_points_and_density(df["comments"]) |
|
|
|
df['unique_insights_per_word'] = df['comments'].apply(calculate_unique_insights_per_word) / df['total_word_count'] |
|
|
|
return df |
|
|
|
def calculate_composite_score(df): |
|
df['composite_score'] = ( |
|
weights['information_density'] * df['information_density'] + |
|
weights['unique_key_points'] * df['unique_key_points'] + |
|
weights['strength_word_count'] * df['strength_word_count'] + |
|
weights['weakness_word_count'] * df['weakness_word_count'] + |
|
weights['discussion_word_count'] * df['discussion_word_count'] |
|
) |
|
|
|
return df |
|
|
|
def classify_review_quality(row): |
|
if row['composite_score'] > 12: |
|
return 'Excellent Review Quality' |
|
elif row['composite_score'] < 3: |
|
return 'Poor Review Quality' |
|
else: |
|
return 'Moderate Review Quality' |
|
|
|
def determine_review_quality(df): |
|
|
|
df['normalized_length'] = df['total_word_count'] / df['total_word_count'].max() |
|
df['unique_insights_per_word'] = df['unique_key_points'] / df['normalized_length'] |
|
df['adjusted_argument_strength'] = df['argument_strength'] / (1 + df['sentence_complexity']) |
|
|
|
df['review_quality'] = df.apply(classify_review_quality, axis=1) |
|
|
|
return df |
|
|
|
def heuristic_optimization(row): |
|
suggestions = [] |
|
|
|
if row["strength_word_count"] > 100 and row["strength_argument_score"] < THRESHOLDS["adjusted_argument_strength"]: |
|
suggestions.append("Summarize redundant strengths.") |
|
elif row["strength_word_count"] < 50 and row["strength_argument_score"] < THRESHOLDS["adjusted_argument_strength"]: |
|
suggestions.append("Add more impactful strengths.") |
|
|
|
if row["weakness_word_count"] > 100 and row["weakness_argument_score"] < THRESHOLDS["adjusted_argument_strength"]: |
|
suggestions.append("Remove repetitive criticisms.") |
|
elif row["weakness_word_count"] < 50 and row["weakness_argument_score"] < THRESHOLDS["adjusted_argument_strength"]: |
|
suggestions.append("Add specific, actionable weaknesses.") |
|
|
|
if row["discussion_word_count"] < 100 and row["information_density"] < THRESHOLDS["information_density"][0]: |
|
suggestions.append("Elaborate with new insights or examples.") |
|
elif row["discussion_word_count"] > 300 and row["information_density"] > THRESHOLDS["information_density"][1]: |
|
suggestions.append("Summarize key discussion points.") |
|
|
|
if row["normalized_length"] < THRESHOLDS["normalized_length"][0]: |
|
suggestions.append("Expand sections for better coverage.") |
|
elif row["normalized_length"] > THRESHOLDS["normalized_length"][1]: |
|
suggestions.append("Condense content to improve readability.") |
|
|
|
if row["unique_key_points"] < THRESHOLDS["unique_key_points"][0]: |
|
suggestions.append("Add more unique insights.") |
|
elif row["unique_key_points"] > THRESHOLDS["unique_key_points"][1]: |
|
suggestions.append("Streamline ideas for clarity.") |
|
|
|
if row["composite_score"] < THRESHOLDS["composite_score"]: |
|
suggestions.append("Enhance clarity, evidence, and argumentation.") |
|
|
|
if row["review_quality"] == "Low": |
|
suggestions.append("Significant revisions required.") |
|
elif row["review_quality"] == "Moderate": |
|
suggestions.append("Minor refinements recommended.") |
|
|
|
return suggestions |
|
|
|
def pipeline(comment, abstract): |
|
df = preprocess(comment, abstract) |
|
df = calculate_composite_score(df) |
|
df = determine_review_quality(df) |
|
df["optimization_suggestions"] = df.apply(heuristic_optimization, axis=1) |
|
return df["review_quality"][0], " ".join(df["optimization_suggestions"][0]) |
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("# Dynaic Length Optimization of Peer Review") |
|
with gr.Row(): |
|
comment = gr.Textbox(label="Peer Review Comments") |
|
abstract = gr.Textbox(label="Paper Abstract") |
|
review_quality = gr.Textbox(label="Predicted Review Quality") |
|
suggestions = gr.Textbox(label="Suggestions") |
|
|
|
comment.change(fn=pipeline, inputs=[comment, abstract], outputs=[review_quality, suggestions]) |
|
|
|
iface = gr.Interface( |
|
fn=pipeline, |
|
inputs=[gr.Textbox(label="Peer Review Comments"), gr.Textbox(label="Paper Abstract")], |
|
outputs=[gr.Textbox(label="Predicted Review Quality"), gr.Textbox(label="Suggestions")], |
|
title="# Dynamic Length Optimization of Peer Review", |
|
description="A framework which dynamically provides suggestion to improve a peer review.", |
|
) |
|
|
|
if __name__ == "__main__": |
|
iface.launch() |
|
|