from flask import Flask, request, make_response |
import os |
from datetime import datetime |
import logging |
from dotenv import load_dotenv |
from heyoo import WhatsApp |
import assemblyai as aai |
import openai |
from utility import generateResponse, parse_multiple_transactions, create_inventory, create_sale, read_datalake |
from google.cloud import firestore |
import ast |
load_dotenv() |
messenger = WhatsApp( |
os.environ["whatsapp_token"], |
phone_number_id=os.environ["phone_number_id"] |
) |
aai.settings.api_key = os.environ["aai_key"] |
transcriber = aai.Transcriber() |
db = firestore.Client.from_service_account_json("firestore-key.json") |
app = Flask(__name__) |
VERIFY_TOKEN = "30cca545-3838-48b2-80a7-9e43b1ae8ce4" |
client = openai.OpenAI( |
api_key=os.environ.get("sambanova_api_key"), |
base_url="https://api.sambanova.ai/v1", |
) |
def messenger_button(recipient_phone, message, header='Transaction Confirmation', footer='', btn_name='Confirm Details'): |
messenger.send_button( |
recipient_id=recipient_phone, |
button={ |
"header": f"{header}", |
"body": f"{message}", |
"footer": f"{footer}", |
"action": { |
"button": f"{btn_name}", |
"sections": [ |
{ |
"title": "iBank", |
"rows": [ |
{"id": "confirm", "title": "Record Transaction", "description": ""}, |
{"id": "cancel", "title": "Cancel Transaction", "description": ""}, |
], |
} |
], |
}, |
}, |
) |
def messenger_reply_button(recipient_phone, message, header = 'Transaction Confirmation'): |
header_message = "*Please confirm the details below before we proceed*:\n\n" |
messenger.send_reply_button( |
recipient_id=f"{recipient_phone}", |
button={ |
"type": "button", |
"body": { |
"text": f"{header_message} {message}" |
}, |
"action": { |
"buttons": [ |
{ |
"type": "reply", |
"reply": { |
"id": "confirm", |
"title": "Confirm" |
} |
}, |
{ |
"type": "reply", |
"reply": { |
"id": "cancel", |
"title": "Cancel" |
} |
} |
] |
} |
}, |
) |
def respond(query_str: str): |
response = "hello, I don't have a brain yet" |
return response |
def persist_temporary_transaction(transactions, mobile): |
""" |
Persists the transaction data temporarily in Firestore. |
""" |
temp_ref = db.collection("users").document(mobile).collection("temp_transactions").document('pending-user-action') |
data = { |
"transactions": transactions, |
"status": "pending", |
"created_at": datetime.now().isoformat() |
} |
temp_ref.set(data) |
return True |
def process_user_msg(message, mobile): |
response = str(generateResponse(message)) |
parsed_trans_data = parse_multiple_transactions(response) |
logging.info(f"\nAnswer: {response}\n") |
intent = parsed_trans_data[0]['intent'].lower() |
if intent == 'read': |
response2 = str(read_datalake(mobile, message)) |
messenger.send_message(f"*Raw Response*:\n {response}, \n \n *Parsed Response*:\n {parsed_trans_data}, \n \n *Final Response*:\n {response2}", recipient_id=mobile) |
else: |
persist_temporary_transaction(parsed_trans_data, mobile) |
messenger_reply_button(mobile, f"*Raw Response*:\n {response}, \n \n *Parsed Response*:\n {parsed_trans_data}") |
return True |
def process_intent(parsed_trans_data, mobile): |
intent = parsed_trans_data[0]['intent'].lower() |
trans_type = parsed_trans_data[0]['transaction_type'].lower() |
if intent == 'create': |
if trans_type == 'purchase': |
if create_inventory(mobile, parsed_trans_data): |
firestore_msg = "Transaction recorded successfully!" |
else: |
firestore_msg = "Sorry, could not record transaction!" |
elif trans_type == 'sale': |
if create_sale(mobile, parsed_trans_data): |
firestore_msg = "Transaction recorded successfully!" |
else: |
firestore_msg = "Sorry, could not record transaction!" |
elif intent == 'update': |
pass |
elif intent == 'delete': |
pass |
else: |
firestore_msg = f'The detected intent, {intent}, is not currently supported!' |
return firestore_msg |
def handle_interactive_response(mobile, button_id): |
""" |
Handles the user's button response (Confirm or Cancel). |
""" |
doc_id = 'pending-user-action' |
temp_ref = db.collection("users").document(mobile).collection("temp_transactions").document(doc_id) |
transaction = temp_ref.get() |
if transaction.exists: |
transaction_data = transaction.to_dict() |
if button_id == "confirm": |
transactions = transaction_data["transactions"] |
msg = process_intent(transactions, mobile) |
temp_ref.delete() |
messenger.send_message(f"{msg}", recipient_id = mobile) |
elif button_id == "cancel": |
temp_ref.delete() |
messenger.send_message("Transaction has been canceled.", recipient_id = mobile) |
else: |
messenger.send_message("Invalid action. Please try again.", recipient_id = mobile) |
else: |
messenger.send_message("No pending transaction found.", recipient_id = mobile) |
@app.route("/", methods=["GET", "POST"]) |
def hook(): |
if request.method == "GET": |
if request.args.get("hub.verify_token") == VERIFY_TOKEN: |
logging.info("Verified webhook") |
response = make_response(request.args.get("hub.challenge"), 200) |
response.mimetype = "text/plain" |
return response |
logging.error("Webhook Verification failed") |
return "Invalid verification token" |
data = request.get_json() |
changed_field = messenger.changed_field(data) |
if changed_field == "messages": |
new_message = messenger.get_mobile(data) |
if new_message: |
mobile = messenger.get_mobile(data) |
message_type = messenger.get_message_type(data) |
if message_type == "text": |
message = messenger.get_message(data) |
if message.lower() in ("hi", "hello", "hola", "help", "how are you"): |
response = "Hi there! My name is SmartLedger. How can I help you today?" |
messenger.send_message(message=f"{response}", recipient_id=mobile) |
else: |
process_user_msg(message, mobile) |
elif message_type == "audio": |
audio = messenger.get_audio(data) |
audio_id, mime_type = audio["id"], audio["mime_type"] |
audio_url = messenger.query_media_url(audio_id) |
audio_filename = messenger.download_media(audio_url, mime_type) |
transcript = transcriber.transcribe(audio_filename) |
print(audio_filename) |
print(transcript.text) |
transcribed_message = transcript.text |
process_user_msg(transcribed_message, mobile) |
elif message_type == "interactive": |
message_response = messenger.get_interactive_response(data) |
interactive_type = message_response.get("type") |
message_id = message_response[interactive_type]["id"] |
message_text = message_response[interactive_type]["title"] |
logging.info(f"Interactive Message: {interactive_type} {message_id}: {message_text}") |
handle_interactive_response(mobile, message_id) |
else: |
messenger.send_message(message=f"Please send me text or audio messages", recipient_id=mobile) |
return "ok" |
if __name__ == '__main__': |
app.run(debug=True, host="", port=7860) |