from fastapi import FastAPI, Request from fastapi.responses import HTMLResponse import threading from email.header import decode_header import mysql.connector from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM import email, imaplib, json, time import torch, logging import uvicorn from pydantic import BaseModel app = FastAPI() # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Email and database configuration IMAP_SERVER = 'imap.gmail.com' EMAIL_ADDRESS = 'narayanansubramani14@gmail.com' PASSWORD = 'gclc wsnx kywt uvqy ' # Store this securely in production DB_CONFIG = { 'host': '0.tcp.in.ngrok.io', 'port': 11329, 'user': 'root', 'password': '', # Add the correct password 'database': 'shipment_details' } # JSON format for extracted shipment details output_format = { "origin": "", "destination": "", "Expected_shipment_datetime": "", "Types of service": "", "Warehouse": "", "Description": "", "Quantities": "", "Carrier_details": "" } # Prompt for LLM to process shipment-related emails prompt = """ System prompt: You will be provided with an email containing shipment details. Your task is to extract specific information based on the given instructions. Instructions: 1. Focus only on extracting details about future shipments, ignore irrelevant information. 2. Output should be in JSON format. Missing information should be marked as null. 3. Extract the following: - origin - destination - expected_shipment_datetime (format: yyyy-mm-dd hh:mm:ss) - types_of_service (AIR, LCL, FCL) - warehouse - description (max 100 words) - quantities - carrier_details 4. The output should be formatted as follows: { "origin": "", "destination": "", "expected_shipment_datetime": "", "types_of_service": "", "warehouse": "", "description": "", "quantities": "", "carrier_details": "" } """ # Function to insert extracted shipment details into MySQL database def insert_data(extracted_details): try: # Initialize MySQL database connection mydb = mysql.connector.connect(**DB_CONFIG) cursor = mydb.cursor() # Skip insertion if all required fields are empty required_fields = [ 'origin', 'destination', 'expected_shipment_datetime', 'types_of_service', 'warehouse', 'description', 'quantities', 'carrier_details' ] if all(extracted_details.get(field) in ["", None] for field in required_fields): logger.info("Skipping insertion: All extracted values are empty.") return # Insert data into database sql = """ INSERT INTO shipment_details ( origin, destination, expected_shipment_datetime, types_of_service, warehouse, description, quantities, carrier_details, sender, receiver, cc, bcc, subject ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ values = ( extracted_details.get('origin'), extracted_details.get('destination'), extracted_details.get('expected_shipment_datetime'), extracted_details.get('types_of_service'), extracted_details.get('warehouse'), extracted_details.get('description'), extracted_details.get('quantities'), extracted_details.get('carrier_details'), extracted_details.get('sender'), extracted_details.get('receiver'), extracted_details.get('cc'), extracted_details.get('bcc'), extracted_details.get('subject') ) cursor.execute(sql, values) mydb.commit() logger.info("Data inserted successfully.") except mysql.connector.Error as db_err: logger.error(f"Database error: {db_err}") except Exception as ex: logger.error(f"Error inserting data: {ex}") # Function to extract shipment details using an LLM def get_details(mail): try: # Initialize LLM model and tokenizer # Uncomment below if using Hugging Face models, or load your specific model accordingly # pipe = pipeline("text-generation", model=model, tokenizer=tokenizer) # output = pipe(f"{prompt}\n{mail}", max_new_tokens=200) # Using Llama model for completion llm = Llama(model_path="./ggml-model-q8_0.gguf", n_ctx=2048, n_batch=512) response = llm.create_chat_completion( messages=[ {"role": "system", "content": prompt}, {"role": "user", "content": mail} ], max_tokens=200 ) return response['choices'][0]['message']['content'] except Exception as ex: logger.error(f"Error generating details from LLM: {ex}") return None # Function to read and process unread emails def read_email(): logging.info('ready to read email ! ...') try: logging.info('get imap server!') mail = imaplib.IMAP4_SSL(IMAP_SERVER) mail.login(EMAIL_ADDRESS, PASSWORD) mail.select('inbox') logging.info('select mail inbox') status, messages = mail.search(None, 'UNSEEN') message_ids = messages[0].split() logging.info(f"Total unread emails: {len(message_ids)}") print(f"Total unread emails: {len(message_ids)}") for message_id in message_ids: try: status, data = mail.fetch(message_id, '(RFC822)') raw_email = data[0][1] email_message = email.message_from_bytes(raw_email) # Extract metadata sender = email_message['From'] receiver = email_message['To'] cc = email_message.get('Cc', '') bcc = email_message.get('Bcc', '') subject = email_message['Subject'] # Extract email body if email_message.is_multipart(): for part in email_message.walk(): if part.get_content_type() == 'text/plain': email_body = part.get_payload(decode=True).decode('utf-8') break else: email_body = email_message.get_payload(decode=True).decode('utf-8') # Extract and store details extracted_details_str = get_details(email_body) extracted_details = json.loads(extracted_details_str) meta_data = { 'sender': sender, 'receiver': receiver, 'cc': cc, 'bcc': bcc, 'subject': subject } extracted_details.update(meta_data) insert_data(extracted_details) except Exception as e: logger.error(f"Error processing email {message_id}: {e}") mail.close() mail.logout() except Exception as e: logger.error(f"Error reading emails: {e}") # Email processing loop running = False loop_thread = None # HTML content for the web interface html_content = """ Email Processing

Email Processing Status: {{ status }}

""" # Define a model for the incoming JSON data class ActionModel(BaseModel): action: str # The action can be 'start' or 'stop' class ModelData(BaseModel): data: str # Function to process emails in a loop def email_processing_loop(): global running logger.info("Starting email processing loop...") while running: # read_email() # Assuming this is your email processing function print("$"*100) time.sleep(10) # Check for new emails every 10 seconds # Endpoint to display the current email processor status @app.get("/", response_class=HTMLResponse) async def home(): global running status = "Running" if running else "Stopped" return HTMLResponse(content=html_content.replace("{{ status }}", status), status_code=200) # Endpoint to receive JSON data to start/stop the email processing loop @app.post("/control", response_class=HTMLResponse) async def control_email_loop(action: ActionModel,data: ModelData): global running, loop_thread logger.info(action.action) if action.action == "start": if not running: running = True email_data = data.data.dict() logger.info(email_data) loop_thread = threading.Thread(target=email_processing_loop, daemon=True) loop_thread.start() logger.info("Email processing loop started.") else: logger.info("Email processing loop is already running.") elif action.action == "stop": if running: running = False logger.info("Email processing loop stopped.") else: logger.info("Email processing loop is not running.") else: raise HTTPException(status_code=400, detail="Invalid action. Use 'start' or 'stop'.") status = "Running" if running else "Stopped" return HTMLResponse(content=html_content.replace("{{ status }}", status), status_code=200) if __name__ == "__main__": print('starting') logging.info('starting project!...') # running = True # threading.Thread(target=email_processing_loop, daemon=True).start() logging.info('...') uvicorn.run()