|
import re |
|
import os |
|
from datetime import datetime |
|
import openai |
|
from google.cloud import firestore |
|
from dotenv import load_dotenv |
|
from pandasai import SmartDatalake |
|
from pandasai import Agent |
|
from pandasai.responses.response_parser import ResponseParser |
|
import pandas as pd |
|
from pandasai.llm import OpenAI |
|
|
|
|
|
from langchain_community.chat_models.sambanova import ChatSambaNovaCloud |
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
o_api_key = os.getenv("openai_api_key") |
|
openai.api_key = o_api_key |
|
|
|
|
|
db = firestore.Client.from_service_account_json("firestore-key.json") |
|
|
|
client = openai.OpenAI( |
|
api_key=os.environ.get("sambanova_api_key"), |
|
base_url="https://api.sambanova.ai/v1", |
|
) |
|
|
|
|
|
|
|
|
|
sambaverse_api_key = os.environ.get("SAMBAVERSE_API_KEY") |
|
|
|
|
|
llm = ChatSambaNovaCloud( |
|
model="Meta-Llama-3.1-70B-Instruct", |
|
max_tokens=1024, |
|
temperature=0.7, |
|
top_k=1, |
|
top_p=0.01, |
|
) |
|
|
|
|
|
print(llm.invoke("Why should I use open source models?")) |
|
|
|
|
|
|
|
class FlaskResponse(ResponseParser): |
|
def __init__(self, context) -> None: |
|
super().__init__(context) |
|
|
|
def format_dataframe(self, result): |
|
return result['value'].to_html() |
|
|
|
def format_plot(self, result): |
|
|
|
try: |
|
|
|
img_path = result['value'] |
|
|
|
|
|
except ValueError: |
|
img_path = str(result['value']) |
|
print("value error!", img_path) |
|
|
|
print("response_class_path:", img_path) |
|
return img_path |
|
|
|
def format_other(self, result): |
|
return str(result['value']) |
|
|
|
|
|
def generateResponse(prompt,model='Meta-Llama-3.1-70B-Instruct'): |
|
|
|
|
|
|
|
relevant_info_template = """ |
|
Intent: The CRUD operation, one of create, read, update, or delete |
|
Transaction Type: The type of transaction such as purchases, sales |
|
Details: as a sublist of the key details like name of item, quantity, cost price, currency, unit for the quantity, description, among other details you are able to extract. |
|
""" |
|
sample_single_transaction_template = """ |
|
The information provided indicates that you want to **create/record** a new transaction. |
|
**Extracted Information**: |
|
|
|
**Intent**: Create |
|
|
|
**Transaction Type**: Purchase |
|
**Details**: |
|
- Item: Car |
|
- Purpose: Business |
|
- Quantity: 1 |
|
- Unit: None |
|
- Cost: 10000 |
|
- Tax: 200 |
|
- Currency: USD |
|
- Note: A new car for business |
|
""" |
|
|
|
sample_multi_transaction_template = """ |
|
The information provided indicates that you want to **create/record** a new transaction. |
|
|
|
**Extracted Information**: |
|
|
|
**Intent**: Create |
|
|
|
Transaction 1: |
|
|
|
**Transaction Type**: Purchase |
|
**Details**: |
|
- Item: Car |
|
- Purpose: Business |
|
- Quantity: 1 |
|
- Unit: None |
|
- Cost: 10000 |
|
- Tax: 200 |
|
- Currency: USD |
|
- Note: A new car for business |
|
|
|
Transaction 2: |
|
|
|
**Transaction Type**: Expense |
|
**Details**: |
|
- Item: Office Chair |
|
- Quantity: 2 |
|
- Unit: None |
|
- Cost: 300 |
|
- Currency: USD |
|
- Category: Furniture |
|
""" |
|
response = client.chat.completions.create( |
|
model = model, |
|
|
|
messages=[ |
|
{"role": "system", "content": f"You are a helpful assistant that classifies transactions written in natural language into CRUD operations (Create, Read, Update, and Delete) and extracts relevant information. You should be able to recognize the currency being used and any quantity units into separate fields. Format the relevant information extracted from the transaction text in this format: {relevant_info_template}. Use markdown syntax to present a nicely formated and readable response to the user, but make sure the user does not see the markdown keyword. Keywords and field names must be in bold face. A sample response for a single transaction could look like this: {sample_single_transaction_template}, while multiple transactions could look like this: {sample_multi_transaction_template}. There should be only one intent even in the case of multiple transactions."}, |
|
{"role": "user", "content": prompt} |
|
] |
|
) |
|
|
|
try: |
|
response = response.choices[0].message.content |
|
except Exception as e: |
|
print(f'An error occurred: {str(e)}') |
|
response = None |
|
|
|
return response |
|
|
|
|
|
def parse_value(value): |
|
""" |
|
Parses a value string into the appropriate data type and detects currency. |
|
Handles various currencies, percentages, numbers, and text. |
|
""" |
|
value = value.strip() |
|
try: |
|
|
|
currency_match = re.search(r"([A-Z]{3}|\$|€|£)", value) |
|
currency = currency_match.group(1) if currency_match else None |
|
|
|
|
|
cleaned_value = re.sub(r"[A-Z]{3}|\$|€|£", "", value).replace(",", "").strip() |
|
|
|
|
|
if "%" in cleaned_value: |
|
return float(cleaned_value.replace("%", "")), currency |
|
|
|
elif cleaned_value.replace(".", "", 1).isdigit(): |
|
return float(cleaned_value) if "." in cleaned_value else int(cleaned_value), currency |
|
|
|
return value, currency |
|
except ValueError: |
|
|
|
return value, None |
|
|
|
def extract_transaction_details(text): |
|
""" |
|
Extracts transaction details from a given text input. |
|
Handles both bold (**field**) and non-bold (field) formats. |
|
""" |
|
details = {} |
|
transaction_currency = None |
|
|
|
|
|
detail_matches = re.findall( |
|
r"-\s*\*{0,2}([\w\s]+)\*{0,2}:\s*([\w\s,.$%-]+?)(?:\s*[\n]|$)", |
|
text, |
|
re.DOTALL |
|
) |
|
|
|
|
|
for field, value in detail_matches: |
|
|
|
field = field.strip().lower().replace(" ", "_") |
|
|
|
|
|
parsed_value, detected_currency = parse_value(value) |
|
if detected_currency and not transaction_currency: |
|
transaction_currency = detected_currency |
|
|
|
details[field] = parsed_value |
|
|
|
|
|
if transaction_currency: |
|
details["currency"] = transaction_currency |
|
|
|
return details |
|
|
|
|
|
|
|
def parse_ai_response(response_text): |
|
|
|
data = { |
|
"intent": None, |
|
"transaction_type": None, |
|
"details": {}, |
|
"created_at": datetime.now().isoformat() |
|
} |
|
|
|
|
|
intent_match = re.search(r"\*\*Intent\*\*:\s*(\w+)", response_text) |
|
if intent_match: |
|
data["intent"] = intent_match.group(1) |
|
|
|
|
|
transaction_type_match = re.search(r"\*\*Transaction Type\*\*:\s*(\w+)", response_text) |
|
if transaction_type_match: |
|
data["transaction_type"] = transaction_type_match.group(1) |
|
|
|
|
|
data["details"] = extract_transaction_details(response_text) |
|
|
|
return data |
|
|
|
def parse_multiple_transactions(response_text): |
|
transactions = [] |
|
|
|
transaction_sections = re.split(r"Transaction \d+:", response_text, flags=re.IGNORECASE) |
|
|
|
|
|
transaction_sections = [section.strip() for section in transaction_sections if section.strip()] |
|
|
|
if not re.search(r"\*\*Transaction Type\*\*", transaction_sections[0], re.IGNORECASE): |
|
transaction_sections.pop(0) |
|
|
|
|
|
intent_match = re.search(r"\*\*Intent\*\*:\s*(\w+)", response_text) |
|
if intent_match: |
|
intent = intent_match.group(1) |
|
|
|
for section in transaction_sections: |
|
|
|
transaction_data = { |
|
"intent": intent, |
|
"transaction_type": None, |
|
"details": {}, |
|
"created_at": datetime.now().isoformat() |
|
} |
|
|
|
|
|
transaction_type_match = re.search(r"\*\*Transaction Type\*\*:\s*(\w+)", section) |
|
if transaction_type_match: |
|
transaction_data["transaction_type"] = transaction_type_match.group(1) |
|
|
|
|
|
transaction_data["details"] = extract_transaction_details(section) |
|
|
|
transactions.append(transaction_data) |
|
|
|
return transactions |
|
|
|
def read_datalake(user_phone, user_question): |
|
inventory_ref = db.collection("users").document(user_phone).collection("inventory") |
|
|
|
sales_ref = db.collection("users").document(user_phone).collection('sales') |
|
|
|
inventory_list = [doc.to_dict() for doc in inventory_ref.stream()] |
|
sales_list = [doc.to_dict() for doc in sales_ref.stream()] |
|
|
|
inventory_df = pd.DataFrame(inventory_list) |
|
sales_df = pd.DataFrame(sales_list) |
|
|
|
|
|
lake = SmartDatalake([inventory_df, sales_df], config={"llm": llm, "response_parser": FlaskResponse, "enable_cache": False, "save_logs": False}) |
|
response = lake.chat(user_question) |
|
|
|
return response |
|
|
|
def create_inventory(user_phone, transaction_data): |
|
for transaction in transaction_data: |
|
item_name = transaction['details']['item'] |
|
doc_ref = db.collection("users").document(user_phone).collection("inventory").document(item_name) |
|
|
|
doc_ref.set(transaction) |
|
|
|
return True |
|
|
|
def create_sale(user_phone, transaction_data): |
|
|
|
for transaction in transaction_data: |
|
item_name = transaction['details']['item'] |
|
|
|
print(item_name, '\n') |
|
print(transaction) |
|
inventory = fetch_transaction(user_phone, item_name) |
|
|
|
|
|
new_stock_level = inventory['details']['quantity'] - transaction['details']['quantity'] |
|
inventory['details']['quantity'] = new_stock_level |
|
|
|
|
|
doc_ref = db.collection("users").document(user_phone).collection("inventory").document(item_name) |
|
doc_ref.set(inventory) |
|
|
|
|
|
doc_ref = db.collection("users").document(user_phone).collection("sales").document(item_name) |
|
doc_ref.set(transaction) |
|
|
|
|
|
return True |
|
|
|
|
|
|
|
|
|
def update_transaction(user_phone, transaction_id, update_data): |
|
doc_ref = db.collection("users").document(user_phone).collection("transactions").document(transaction_id) |
|
doc_ref.update(update_data) |
|
|
|
return True |
|
|
|
def fetch_transaction(user_phone, transaction_id=None): |
|
if transaction_id: |
|
doc_ref = db.collection("users").document(user_phone).collection("inventory").document(transaction_id) |
|
transaction = doc_ref.get() |
|
if transaction.exists: |
|
return transaction.to_dict() |
|
else: |
|
|
|
return None |
|
else: |
|
collection_ref = db.collection("users").document(user_phone).collection("inventory") |
|
transactions = [doc.to_dict() for doc in collection_ref.stream()] |
|
return transactions |
|
|
|
|
|
|
|
def delete_transaction_fields(user_phone, transaction_id, fields_to_delete): |
|
doc_ref = db.collection("users").document(user_phone).collection("transactions").document(transaction_id) |
|
updates = {field: firestore.DELETE_FIELD for field in fields_to_delete} |
|
doc_ref.update(updates) |
|
print("Fields deleted successfully!") |
|
|
|
|
|
def delete_transaction(user_phone, transaction_id): |
|
doc_ref = db.collection("users").document(user_phone).collection("transactions").document(transaction_id) |
|
doc_ref.delete() |
|
print("Transaction deleted successfully!") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|