Spaces:
Sleeping
Sleeping
""" | |
Module for detecting fallacies in text. | |
""" | |
import os | |
import re | |
import time | |
import json | |
import csv | |
from ast import literal_eval | |
from collections import namedtuple | |
import requests | |
from langchain_community.llms import HuggingFaceEndpoint | |
from langchain_community.chat_models.huggingface import ChatHuggingFace | |
from langchain.agents import AgentExecutor, load_tools, create_react_agent | |
from langchain.agents.output_parsers import ReActJsonSingleInputOutputParser | |
from langchain.tools import Tool | |
from langchain.tools import DuckDuckGoSearchRun | |
from .templates import ( | |
REACT, | |
INCONTEXT, | |
SUMMARIZATION, | |
CONCLUDING, | |
CONCLUDING_INCONTEXT, | |
) | |
from .definitions import DEFINITIONS | |
from .examples import FALLACY_CLAIMS, DEBUNKINGS | |
os.environ["HUGGINGFACEHUB_API_TOKEN"] = os.environ.get("HF_API_KEY") | |
class HamburgerStyle: | |
def __init__(self): | |
# hamburger-style structure: | |
self.heading = namedtuple("Heading", ["name", "content"]) | |
self.hamburger = [ | |
self.heading(name="Myth", content=None), | |
self.heading(name="##FACT", content=None), | |
self.heading(name="##MYTH", content=None), | |
self.heading(name="##FALLACY", content=None), | |
self.heading(name="##FACT", content=None), | |
] | |
self.llm = HuggingFaceEndpoint( | |
repo_id="mistralai/Mixtral-8x7B-Instruct-v0.1", | |
temperature=1, | |
top_k=1, | |
model_kwargs={ | |
"use_cache": False, | |
}, | |
) | |
self.chat_model = ChatHuggingFace(llm=self.llm) | |
self.flicc_model = "fzanartu/flicc" | |
self.card_model = "crarojasca/BinaryAugmentedCARDS" | |
self.semantic_textual_similarity = "sentence-transformers/all-MiniLM-L6-v2" | |
self.taxonomy_cards = "crarojasca/TaxonomyAugmentedCARDS" | |
self.dirname = os.path.dirname(os.path.abspath("__file__")) | |
self.filename = os.path.join(self.dirname, "utils/climate_fever_cards.csv") | |
def generate_st_layer(self, misinformation): | |
## FACT: ReAct | |
prompt = REACT | |
# define the agent | |
chat_model_with_stop = self.chat_model.bind(stop=["\nObservation"]) | |
agent = ( | |
{ | |
"input": lambda x: x["input"], | |
"agent_scratchpad": lambda x: format_log_to_str( | |
x["intermediate_steps"] | |
), | |
} | |
| prompt | |
| self.chat_model | |
| ReActJsonSingleInputOutputParser() | |
) | |
search = DuckDuckGoSearchRun() | |
tools = [ | |
Tool( | |
name="google_search", | |
description="Search Google for recent results.", | |
func=search.run, | |
) | |
] | |
agent = create_react_agent(chat_model_with_stop, tools, prompt) | |
agent_executor = AgentExecutor( | |
agent=agent, tools=tools, verbose=False, handle_parsing_errors=True | |
) | |
return agent_executor.invoke({"input": misinformation}).get("output") | |
def generate_nd_layer(self, misinformation): | |
## MYTH: Summ | |
prompt = SUMMARIZATION | |
chain = prompt | self.llm | |
return chain.invoke({"text": misinformation}) | |
def generate_rd_layer(self, misinformation): | |
## FALLACY: Fallacy | |
# 1 predict fallacy label in FLICC taxonomy | |
detected_fallacy = self.endpoint_query( | |
model=self.flicc_model, payload=misinformation | |
)[0][0].get("label") | |
fallacy_definition = DEFINITIONS.get(detected_fallacy) | |
# 2 get all examples with the same label | |
claims = FALLACY_CLAIMS.get(detected_fallacy, None) | |
# 3 get cosine similarity for all claims and myth | |
example_myths = self.endpoint_query( | |
payload={"source_sentence": misinformation, "sentences": claims}, | |
model=self.semantic_textual_similarity, | |
) | |
# 3 # get most similar claim and FACT | |
max_similarity = example_myths.index(max(example_myths)) | |
example_myth = claims[max_similarity] | |
example_response = DEBUNKINGS.get(claims[max_similarity]) | |
fact = re.findall(r"## FALLACY:.*?(?=##)", example_response, re.DOTALL)[ | |
0 | |
] # get only the fallacy layer from the example. | |
fact = fact.replace("## FALLACY:", "") | |
prompt = INCONTEXT | |
chain = prompt | self.chat_model | |
content = chain.invoke( | |
{ | |
"misinformation": misinformation, | |
"detected_fallacy": detected_fallacy, | |
"fallacy_definition": fallacy_definition, | |
"example_response": fact, | |
"example_myth": example_myth, | |
"factual_information": self.hamburger[1].content, | |
} | |
).content | |
content = re.sub(r"Response:", "", content) | |
return content | |
def generate_th_layer(self, misinformation): | |
## FACT: Concluding | |
cards_label = self.endpoint_query( | |
model=self.taxonomy_cards, payload=misinformation | |
)[0][0].get("label") | |
# 1 get all claims with same label from FEVER dataset | |
claims = self.get_fever_claims(cards_label) # TODO | |
prompt_completition = {"fact": self.hamburger[1].content} | |
if claims: | |
prompt = CONCLUDING_INCONTEXT | |
example_myths = self.endpoint_query( | |
payload={ | |
"input": {"source_sentence": misinformation, "sentences": claims} | |
}, | |
model=self.semantic_textual_similarity, | |
) | |
max_similarity = example_myths.index(max(example_myths)) | |
example_myth = claims[max_similarity] | |
complementary_details = self.get_fever_evidence(example_myth) # TODO | |
prompt_completition.update({"complementary_details": complementary_details}) | |
else: | |
prompt = CONCLUDING | |
chain = prompt | self.llm | |
return chain.invoke(prompt_completition) | |
def rebuttal_generator(self, misinformation): | |
# generate rebuttal | |
self.hamburger[0] = self.hamburger[0]._replace(content=misinformation) | |
## FACT | |
self.hamburger[1] = self.hamburger[1]._replace( | |
content=self.generate_st_layer(misinformation).strip() | |
) | |
## MYTH | |
self.hamburger[2] = self.hamburger[2]._replace( | |
content=self.generate_nd_layer(misinformation).strip() | |
) | |
## FALLACY | |
self.hamburger[3] = self.hamburger[3]._replace( | |
content=self.generate_rd_layer(misinformation).strip() | |
) | |
## FACT | |
self.hamburger[4] = self.hamburger[4]._replace( | |
content=self.generate_th_layer(misinformation).strip() | |
) | |
# compose and format the string | |
rebuttal = f"""{self.hamburger[1].name}: {self.hamburger[1].content}\n{self.hamburger[2].name}: {self.hamburger[2].content}\n{self.hamburger[3].name}: {self.hamburger[3].content}\n{self.hamburger[4].name}: {self.hamburger[4].content}""" | |
return rebuttal | |
def endpoint_query(self, payload, model): | |
headers = {"Authorization": f"Bearer {os.environ['HUGGINGFACEHUB_API_TOKEN']}"} | |
options = {"use_cache": False, "wait_for_model": True} | |
payload = {"inputs": payload, "options": options} | |
api_url = f"https://api-inference.huggingface.co/models/{model}" | |
response = requests.post(api_url, headers=headers, json=payload, timeout=120) | |
return json.loads(response.content.decode("utf-8")) | |
def retry_on_exceptions(self, function, *args): | |
attempt = 0 | |
while attempt < 5: | |
try: | |
return function(*args) | |
except (KeyError, ValueError): | |
print("retrying %d out of 5", attempt + 1) | |
time.sleep(5 * (attempt + 1)) | |
attempt += 1 | |
continue | |
# Return None if no response after five attempts | |
return None | |
def get_fever_claims(self, label): | |
claims = [] | |
with open(self.filename, "r", encoding="utf-8") as csvfile: | |
reader = csv.DictReader(csvfile) | |
for row in reader: | |
if row["claim_label"] == 1 and row["CARDS_label"] == label: | |
claims.append(row["claim"]) | |
return claims | |
def get_fever_evidence(self, claim): | |
evidences = [] | |
with open(self.filename, "r", encoding="utf-8") as csvfile: | |
reader = csv.DictReader(csvfile) | |
for row in reader: | |
if row["claim_label"] == 1 and row["claim"] == claim: | |
for evidence_dict in literal_eval(row["evidences"]): | |
evidences.append(evidence_dict["evidence"]) | |
return "\n".join("* " + evidence for evidence in evidences) | |