Quantx-WhatsApp / main.py
williamagyapong's picture
Update
e011e39 verified
raw
history blame
5.96 kB
from flask import Flask,request,make_response
import os
import logging
from dotenv import load_dotenv
from heyoo import WhatsApp
import random
import shutil
from tempfile import NamedTemporaryFile
import assemblyai as aai
from pandasai.llm import GoogleGemini
from pandasai import SmartDataframe
from pandasai import Agent
from pandasai.responses.response_parser import ResponseParser
from langchain_experimental.agents import create_pandas_dataframe_agent
import pandas as pd
from langchain_google_genai import GoogleGenerativeAI
from langchain_google_genai.chat_models import ChatGoogleGenerativeAI
import requests
import base64
from pandasai.helpers import path
import uuid
import pandasai as pai
# load env data
load_dotenv()
# messenger object
messenger = WhatsApp(
os.environ["whatsapp_token"],
phone_number_id=os.environ["phone_number_id"] )
# aai.settings.api_key = os.environ["aai_key"]
# transcriber = aai.Transcriber()
app = Flask(__name__)
VERIFY_TOKEN = "30cca545-3838-48b2-80a7-9e43b1ae8ce4"
client = openai.OpenAI(
api_key=os.environ.get("sambanova_api_key"),
base_url="https://api.sambanova.ai/v1",
)
def generateResponse(prompt):
#----- Call API to classify and extract relevant transaction information
# These templates help provide a unified response format for use as context clues when
# parsing the AI generated response into a structured data format
relevant_info_template = """
Intent: The CRUD operation
Transaction Type: The type of transaction
Details: as a sublist of the key details like name of item, amount, description, among other details you are able to extract.
"""
sample_response_template = """
The information provided indicates that you want to **create/record** a new transaction.
**Extracted Information**:
**Intent**: Create
Transaction 1:
**Transaction Type**: Purchase
**Details**:
- Item: Car
- Purpose: Business
- Amount: 1000
- Tax: 200
- Note: A new car for business
Transaction 2:
**Transaction Type**: Expense
**Details**:
- Item: Office Chair
- Amount: 300 USD
- Category: Furniture
"""
response = client.chat.completions.create(
model='Meta-Llama-3.1-70B-Instruct',
messages=[
{"role": "system", "content": f"You are a helpful assistant that classifies transactions written in natural language into CRUD operations (Create, Read, Update, and Delete) and extracts relevant information. Format the relevant information extracted from the transaction text in this format: {relevant_info_template}. You can use markdown syntax to present a nicely formated and readable response to the user, but make sure the user does not see the markdown keyword. Keywords and field names must be in bold face. A sample response could look like this: {sample_response_template}. Delineate multiple transactions with the label 'Transaction 1' before the start of the relevant information for each transaction. There should be only one intent even in the case of multiple transactions."},
{"role": "user", "content": prompt}
]
)
#----- Process response
try:
response = response.choices[0].message.content
except Exception as e:
print(f'An error occurred: {str(e)}')
response = None
return response
def respond(query_str:str):
response = "hello, I don't have a brain yet"
return response
@app.route("/", methods=["GET", "POST"])
def hook():
if request.method == "GET":
if request.args.get("hub.verify_token") == VERIFY_TOKEN:
logging.info("Verified webhook")
response = make_response(request.args.get("hub.challenge"), 200)
response.mimetype = "text/plain"
return response
logging.error("Webhook Verification failed")
return "Invalid verification token"
# get message update..
data = request.get_json()
changed_field = messenger.changed_field(data)
if changed_field == "messages":
new_message = messenger.get_mobile(data)
if new_message:
mobile = messenger.get_mobile(data)
message_type = messenger.get_message_type(data)
if message_type == "text":
message = messenger.get_message(data)
# Handle greetings
if message.lower() in ("hi", "hello", "help", "how are you"):
response = "Hi there! My name is BuzyHelper. How can I help you today?"
messenger.send_message(message=f"{response}",recipient_id=mobile)
else:
response = str(generateResponse(message))
print("Response:", response)
logging.info(f"\nAnswer: {response}\n")
# Handle cases where response is not a valid image path
messenger.send_message(message=f"{response}", recipient_id=mobile)
# elif message_type == "audio":
# audio = messenger.get_audio(data)
# audio_id, mime_type = audio["id"], audio["mime_type"]
# audio_url = messenger.query_media_url(audio_id)
# audio_filename = messenger.download_media(audio_url, mime_type)
# transcript =transcriber.transcribe(audio_filename)
# print(audio_filename)
# print(transcript.text)
# res = transcript.text
# logging.info(f"\nAudio: {audio}\n")
# response = str(generateResponse(res))
# if isinstance(response, str):
# messenger.send_message(message=f"{response}", recipient_id=mobile)
# elif isinstance(response, str) and os.path.isfile(response):
# messenger.send_image(image_path=response, recipient_id=mobile)
else:
messenger.send_message(message="Please send me text or audio messages",recipient_id=mobile)
return "ok"
if __name__ == '__main__':
app.run(debug=True,host="0.0.0.0", port=7860)