Spaces:
Sleeping
Sleeping
import re | |
import os | |
from datetime import datetime | |
import openai | |
from google.cloud import firestore | |
from dotenv import load_dotenv | |
from pandasai import SmartDatalake | |
from pandasai import Agent | |
from pandasai.responses.response_parser import ResponseParser | |
import pandas as pd | |
from pandasai.llm import OpenAI | |
#from langchain.llms.sambanova import Sambaverse | |
#from langchain_community.llms.sambanova import Sambaverse | |
from langchain_community.chat_models.sambanova import ChatSambaNovaCloud | |
import ast | |
import json | |
# Make API connection | |
load_dotenv() | |
# gemini_api_key = os.environ['Gemini'] | |
# o_api_key = os.getenv("openai_api_key") | |
# openai.api_key = o_api_key | |
# Authenticate to Firesotre with the JSON account key | |
db = firestore.Client.from_service_account_json("firestore-key.json") | |
client = openai.OpenAI( | |
api_key=os.environ.get("SAMBANOVA_API_KEY"), | |
base_url="https://api.sambanova.ai/v1", | |
) | |
sambaverse_api_key = os.environ.get("SAMBANOVA_API_KEY") | |
llm = ChatSambaNovaCloud( | |
model="Meta-Llama-3.1-70B-Instruct", | |
max_tokens=1024, | |
temperature=0.7, | |
top_k=1, | |
top_p=0.01, | |
) | |
print(llm.invoke("Why should I use open source models?")) | |
class FlaskResponse(ResponseParser): | |
def __init__(self, context) -> None: | |
super().__init__(context) | |
def format_dataframe(self, result): | |
return result['value'].to_html() | |
def format_plot(self, result): | |
# Save the plot using savefig | |
try: | |
img_path = result['value'] | |
except ValueError: | |
img_path = str(result['value']) | |
print("value error!", img_path) | |
print("response_class_path:", img_path) | |
return img_path | |
def format_other(self, result): | |
return str(result['value']) | |
# Generate AI response from user input | |
def generateResponse(prompt,model='Meta-Llama-3.1-70B-Instruct'): | |
#----- Call API to classify and extract relevant transaction information | |
# These templates help provide a unified response format for use as context clues when | |
# parsing the AI generated response into a structured data format | |
relevant_info_template = """ | |
Intent: The CRUD operation, one of create, read, update, or delete | |
Transaction Type: The type of transaction such as purchases, sales | |
Details: as a sublist of the key details like name of item, quantity, cost price, currency, unit for the quantity, description, among other details you are able to extract. | |
""" | |
sample_single_transaction_template = """ | |
The information provided indicates that you want to *create/record* a new transaction. | |
*Extracted Information*: | |
*Intent*: Create | |
*Transaction Type*: Purchase | |
*Details*: | |
- Item: Car | |
- Purpose: Business | |
- Quantity: 1 | |
- Unit: None | |
- Cost: 10000 | |
- Tax: 200 | |
- Currency: USD | |
- Note: A new car for business | |
""" | |
sample_multi_transaction_template = """ | |
The information provided indicates that you want to *create/record* a new transaction. | |
*Extracted Information*: | |
*Intent*: Create | |
Transaction 1: | |
*Transaction Type*: Purchase | |
*Details*: | |
- Item: Car | |
- Purpose: Business | |
- Quantity: 1 | |
- Unit: None | |
- Cost: 10000 | |
- Tax: 200 | |
- Currency: USD | |
- Note: A new car for business | |
Transaction 2: | |
*Transaction Type*: Expense | |
*Details*: | |
- Item: Office Chair | |
- Quantity: 2 | |
- Unit: None | |
- Cost: 300 | |
- Currency: USD | |
- Category: Furniture | |
""" | |
response = client.chat.completions.create( | |
model = model, | |
# model="gpt-4o", | |
messages=[ | |
{"role": "system", "content": f"You are a helpful assistant that classifies transactions written in natural language into CRUD operations (Create, Read, Update, and Delete) and extracts relevant information. For update and delete queries, the transaction type should either be sales or inventory. You should be able to recognize the currency being used and any quantity units into separate fields. Format the relevant information extracted from the transaction text in this format: {relevant_info_template}. A sample response for a single transaction could look like this: {sample_single_transaction_template}, while multiple transactions could look like this: {sample_multi_transaction_template}. There should be only one intent even in the case of multiple transactions."}, | |
{"role": "user", "content": prompt} | |
] | |
) | |
#----- Process response | |
try: | |
response = response.choices[0].message.content | |
except Exception as e: | |
print(f'An error occurred: {str(e)}') | |
response = None | |
return response | |
def parse_value(value): | |
""" | |
Parses a value string into the appropriate data type and detects currency. | |
Handles various currencies, percentages, numbers, and text. | |
""" | |
value = value.strip() | |
try: | |
# Match currency codes or symbols dynamically | |
currency_match = re.search(r"([A-Z]{3}|\$|€|£)", value) | |
currency = currency_match.group(1) if currency_match else None | |
# Remove currency symbols or codes for numeric conversion | |
cleaned_value = re.sub(r"[A-Z]{3}|\$|€|£", "", value).replace(",", "").strip() | |
# Handle percentages | |
if "%" in cleaned_value: | |
return float(cleaned_value.replace("%", "")), currency | |
# Handle plain numbers (integers or floats) | |
elif cleaned_value.replace(".", "", 1).isdigit(): | |
return float(cleaned_value) if "." in cleaned_value else int(cleaned_value), currency | |
# Return as text if no numeric parsing is possible | |
return value, currency | |
except ValueError: | |
# Fallback to original value if parsing fails | |
return value, None | |
def extract_transaction_details(text): | |
""" | |
Extracts transaction details from a given text input. | |
Handles both bold (**field**) and non-bold (field) formats. | |
""" | |
details = {} | |
transaction_currency = None # Default currency field | |
# Regex to match key-value pairs | |
detail_matches = re.findall( | |
r"-\s*\*{0,2}([\w\s]+)\*{0,2}:\s*([\w\s,.$%-]+?)(?:\s*[\n]|$)", # Stop matching before newline or end of string | |
text, | |
re.DOTALL | |
) | |
# print("Detail matches:", detail_matches) # Debugging | |
for field, value in detail_matches: | |
# Standardize the field name (convert to snake_case) | |
field = field.strip().lower().replace(" ", "_") | |
# Parse the value and dynamically detect currency | |
parsed_value, detected_currency = parse_value(value) | |
if detected_currency and not transaction_currency: | |
transaction_currency = detected_currency # Set the transaction-level currency if not already set | |
details[field] = parsed_value | |
# Add currency as a separate field | |
if transaction_currency: | |
details["currency"] = transaction_currency | |
return details | |
# Parsing single transactions | |
def parse_ai_response(response_text): | |
# Initialize the structured data dictionary | |
data = { | |
"intent": None, | |
"transaction_type": None, | |
"details": {}, | |
"created_at": datetime.now().isoformat() # Add current date and time | |
} | |
# Extract the intent | |
intent_match = re.search(r"\*Intent\*:\s*(\w+)", response_text) | |
if intent_match: | |
data["intent"] = intent_match.group(1) | |
# Extract the transaction type | |
transaction_type_match = re.search(r"\*Transaction Type\*:\s*(\w+)", response_text) | |
if transaction_type_match: | |
data["transaction_type"] = transaction_type_match.group(1) | |
# Store details in the structured data | |
data["details"] = extract_transaction_details(response_text) | |
return data | |
def parse_multiple_transactions(response_text): | |
transactions = [] | |
# Split the response into transaction sections based on keyword 'Transaction #' | |
transaction_sections = re.split(r"Transaction \d+:", response_text, flags=re.IGNORECASE) | |
# Adjust regex to handle variations in "Transaction X:" | |
# transaction_sections = re.split(r"(?i)(?<=\n)transaction\s+\d+:", response_text) | |
transaction_sections = [section.strip() for section in transaction_sections if section.strip()] | |
# Remove the first section if it's not a valid transaction | |
if not re.search(r"\*Transaction Type\*", transaction_sections[0], re.IGNORECASE): | |
transaction_sections.pop(0) | |
# Extract intent: with support for a single intent per user prompt | |
intent_match = re.search(r"\*Intent\*:\s*(\w+)", response_text) | |
if intent_match: | |
intent = intent_match.group(1) | |
for section in transaction_sections: | |
# Initialize transaction data | |
transaction_data = { | |
"intent": intent, # global intent | |
"transaction_type": None, | |
"details": {}, | |
"created_at": datetime.now().isoformat() | |
} | |
# Extract transaction type | |
transaction_type_match = re.search(r"\*Transaction Type\*:\s*(\w+)", section) | |
if transaction_type_match: | |
transaction_data["transaction_type"] = transaction_type_match.group(1) | |
# Extract details | |
transaction_data["details"] = extract_transaction_details(section) | |
transactions.append(transaction_data) | |
return transactions | |
def read_datalake(user_phone, user_question): | |
inventory_ref = db.collection("users").document(user_phone).collection("inventory") | |
sales_ref = db.collection("users").document(user_phone).collection('sales') | |
inventory_list = [doc.to_dict() for doc in inventory_ref.stream()] | |
sales_list = [doc.to_dict() for doc in sales_ref.stream()] | |
inventory_df = pd.DataFrame(inventory_list) | |
sales_df = pd.DataFrame(sales_list) | |
lake = SmartDatalake([inventory_df, sales_df], config={"llm": llm, "custom_whitelisted_dependencies":["ast"], "response_parser": FlaskResponse, "enable_cache": False, "save_logs": False}) | |
response = lake.chat(user_question) | |
return response | |
def create_inventory(user_phone, transaction_data): | |
for transaction in transaction_data: | |
item_name = transaction['details']['item'] # assumes unique item name | |
doc_ref = db.collection("users").document(user_phone).collection("inventory").document(item_name) | |
# transaction_data['transaction_id'] # let's default to random generated document ids | |
doc_ref.set(transaction) | |
# print("Transaction created successfully!") | |
return True | |
def create_sale(user_phone, transaction_data): | |
for transaction in transaction_data: | |
item_name = transaction['details']['item'] # assumes item names are unique per transactions | |
# fetch the inventory | |
inventory = fetch_transaction(user_phone, item_name) | |
# Do the sales calculations | |
new_stock_level = inventory['details']['quantity'] - transaction['details']['quantity'] | |
inventory['details']['quantity'] = new_stock_level | |
# update the inventory | |
doc_ref = db.collection("users").document(user_phone).collection("inventory").document(item_name) | |
doc_ref.set(inventory) | |
# Create the sale | |
doc_ref = db.collection("users").document(user_phone).collection("sales").document(item_name) | |
doc_ref.set(transaction) | |
# print("Transaction created successfully!") | |
return True | |
# Update logic: | |
# - | |
def update_transaction(user_phone, transaction_id, update_data): | |
doc_ref = db.collection("users").document(user_phone).collection("transactions").document(transaction_id) | |
doc_ref.update(update_data) | |
# print("Transaction updated successfully!") | |
return True | |
def fetch_transaction(user_phone, transaction_id=None): | |
if transaction_id: | |
doc_ref = db.collection("users").document(user_phone).collection("inventory").document(transaction_id) | |
transaction = doc_ref.get() | |
if transaction.exists: | |
return transaction.to_dict() | |
else: | |
# print("Transaction not found.") | |
return None | |
else: | |
collection_ref = db.collection("users").document(user_phone).collection("inventory") | |
transactions = [doc.to_dict() for doc in collection_ref.stream()] | |
return transactions | |
# Delete fields or an entire transaction document. | |
# Delete specific fields | |
def delete_transaction_fields(user_phone, transaction_id, fields_to_delete): | |
doc_ref = db.collection("users").document(user_phone).collection("transactions").document(transaction_id) | |
updates = {field: firestore.DELETE_FIELD for field in fields_to_delete} | |
doc_ref.update(updates) | |
print("Fields deleted successfully!") | |
# Delete an entire transaction | |
def delete_transaction(user_phone, transaction_id): | |
doc_ref = db.collection("users").document(user_phone).collection("transactions").document(transaction_id) | |
doc_ref.delete() | |
print("Transaction deleted successfully!") | |
# Delete an entire transaction | |
def delete_transaction(user_phone, transaction_data): | |
# the transaction id currently defaults to the item name | |
transaction_id = transaction_data[0]['details']['item'] | |
transaction_type = transaction_data[0]['transaction_type'].lower() | |
doc_ref = db.collection("users").document(user_phone).collection(transaction_type).document(transaction_id) | |
item_doc = doc_ref.get() | |
if item_doc.exists: | |
doc_ref.delete() | |
return True | |
else: | |
return False | |
# Example usage | |
# response_text = """ | |
# The information provided indicates that you want to **create/record** a new transaction. | |
# **Extracted Information**: | |
# **Intent**: Create | |
# **Transaction Type**: Purchase | |
# **Details**: | |
# - Item: Car | |
# - Purpose: Business | |
# - Amount: 100000 USD | |
# - Tax: 1000 USD | |
# """ | |
# parsed_data = parse_ai_response(response_text) | |
# print(parsed_data) | |