from fastapi import FastAPI, Request from fastapi.responses import HTMLResponse import threading from email.header import decode_header import mysql.connector from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM import email, imaplib, json, time import torch, logging import uvicorn from pydantic import BaseModel import pandas as pd from llama_cpp import Llama app = FastAPI() # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Email and database configuration DB_CONFIG = { 'host': '0.tcp.in.ngrok.io', 'port': 14458, 'user': 'root', 'password': '', 'database': 'shipment_details' } output_format = { "origin": "", "destination": "", "Expected_shipment_datetime": "", "Types of service": "", "Warehouse": "", "Description": "", "Quantities": "", "Carrier_details": "" } prompt = f""" System prompt: You will be provided with an email containing shipment details. Your task is to extract specific information based on the given instructions. Instructions: 1. The input email may contain irrelevant information. Focus only on extracting details about future shipments. 2. The output should be in JSON format. If a type of information is not found, it should be marked as null. 3. Extract the following information: - origin: The origin location of the consignment. - destination: The destination location of the consignment. - expected_shipment_datetime: The expected date and time of delivery to the warehouse (format: yyyy-mm-dd hh:mm:ss). - types_of_service: The type of service (AIR, LCL, FCL). AIR can be mentioned as flight, aeroplane, or any mode of air transport. LCL is a Less-Container Load, and FCL is a Full-Container Load. - warehouse: The name of the warehouse. - description: A brief description of the email (ASN). - quantities: The number of items in the shipment. - carrier_details: The details of the carrier. 4. the output extracted information contains must be in this format: {{ "origin": "", "destination": "", "expected_shipment_datetime": "", "types_of_service": "", "warehouse": "", "description": "", "quantities": "", "carrier_details": "" }} Examples: 1. Email: We are pleased to inform you of an upcoming shipment originating from Hamburg and destined for New York. The shipment is expected to arrive on August 15, 2024. This consignment includes various electronics, with an estimated quantity of 200 units. The service type for this shipment is AIR, provided by our reliable carrier, Sky Logistics. Extracted Information: origin: Hamburg, destination: New York, expected_shipment_datetime: 2024-08-15 00:00:000, types_of_service: AIR, warehouse: Sky Logistics, description: We are pleased to inform you of an upcoming shipment originating from Hamburg and destined for New York. The shipment is expected to arrive on August 15, 2024., quantities: 200 units, carrier_details: Sky Logistics 2. Email: Please be advised of a shipment from our supplier in Shanghai heading to Los Angeles. The expected date of arrival is July 30, 2024. The shipment consists of mixed goods, mainly textiles, with a total of 500 pieces. This delivery will be handled through LCL service by Ocean Freight Co. Extracted Information: origin: Shanghai, destination: Los Angeles, expected_shipment_datetime: 2024-07-30 00:00:0000, types_of_service: LCL, warehouse: Ocean Freight Co., description: Please be advised of a shipment from our supplier in Shanghai heading to Los Angeles. The expected date of arrival is July 30, 2024., quantities: 500 pieces, carrier_details: Ocean Freight Co. 3. Email: A new shipment is on its way from Mumbai to London, scheduled to reach by August 22, 2024. This batch contains furniture items, totaling 150 pieces. It is managed by Global Carriers. Extracted Information: origin: Mumbai, destination: London, expected_shipment_datetime: 2024-08-22 00:00:00000, types_of_service: null, warehouse: Global Carriers, description: A new shipment is on its way from Mumbai to London, scheduled to reach by August 22, 2024., quantities: 150 pieces, carrier_details: Global Carriers 4. Email: We are notifying you about a shipment dispatched from Tokyo, heading towards Sydney, with an estimated arrival date of September 10, 2024. The cargo includes automotive parts, summing up to 350 units. This shipment will be transported via AIR service, operated by Jet Logistics. Extracted Information: origin: Tokyo, destination: Sydney, expected_shipment_datetime: 2024-09-10 00:00:0000, types_of_service: AIR, warehouse: Jet Logistics, description: We are notifying you about a shipment dispatched from Tokyo, heading towards Sydney, with an estimated arrival date of September 10, 2024., quantities: 350 units, carrier_details: Jet Logistics 5. Email: Just a reminder about our meeting the day after at 10 AM. Extracted Information: origin: null, destination: null, expected_shipment_datetime: 0000-00-00 10:00:0000, types_of_service: null, warehouse: null, description: Just a reminder about our meeting the day after at 10 AM., quantities: null, carrier_details: null Output: {output_format} """ # Function to insert extracted shipment details into MySQL database def insert_data(extracted_details): try: # Initialize MySQL database connection mydb = mysql.connector.connect(**DB_CONFIG) cursor = mydb.cursor() # Skip insertion if all required fields are empty required_fields = [ 'origin', 'destination', 'expected_shipment_datetime', 'types_of_service', 'warehouse', 'description', 'quantities', 'carrier_details' ] if all(extracted_details.get(field) in ["", None] for field in required_fields): logger.info("Skipping insertion: All extracted values are empty.") return # Insert data into database sql = """ INSERT INTO shipment_details ( origin, destination, expected_shipment_datetime, types_of_service, warehouse, description, quantities, carrier_details, sender, receiver, cc, bcc, subject ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ values = ( extracted_details.get('origin'), extracted_details.get('destination'), extracted_details.get('expected_shipment_datetime'), extracted_details.get('types_of_service'), extracted_details.get('warehouse'), extracted_details.get('description'), extracted_details.get('quantities'), extracted_details.get('carrier_details'), extracted_details.get('sender'), extracted_details.get('receiver'), extracted_details.get('cc'), extracted_details.get('bcc'), extracted_details.get('subject') ) cursor.execute(sql, values) mydb.commit() logger.info("Data inserted successfully.") except mysql.connector.Error as db_err: logger.error(f"Database error: {db_err}") except Exception as ex: logger.error(f"Error inserting data: {ex}") llm = Llama(model_path='./ggml-model-f16.gguf', n_ctx=2048, n_batch=2048, n_threads= 204) def read_email(): try: df = pd.read_csv('./emails.csv') for i in df['Body']: try: prompt_ = f"system:{prompt}<|end|>user:{i}<|end|>assistant:" output = llm(prompt_, max_tokens=1000, temperature=0.1) print("*"*50) t = output['choices'][0]['text'] print('input : ',i) print('output : ',t) t = t[t.find('{\n'):t.find('}\n')+1] print('json data : ', t) extracted_details = json.loads(t) print('extracted json : ',type(extracted_details)) print(type(extracted_details)) print('_'*50) # print(extracted_details) print(type(extracted_details)) meta_data = { 'sender': 'sender', 'receiver': 'receiver', 'cc': 'cc', 'bcc': 'bcc', 'subject': 'subject' } # print(type(meta_data)) extracted_details.update(meta_data) print('full data about email ! ...::',extracted_details) insert_data(extracted_details) except Exception as e: print(f"Error processing email ID {i}: {e}") except Exception as e: print(f"Error reading csv emails: {e}") # Global variables running = False loop_thread = None # HTML content for the web interface html_content = """ Email Processing

Email Processing Status: {{ status }}

""" # Function to process emails in a loop def email_processing_loop(): global running logging.info("Starting email processing loop...") data = read_email() print("$" * 100) # Simulate email processing time.sleep(10) # Check for new emails every 10 seconds return data # Endpoint to display the current email processor status @app.get("/", response_class=HTMLResponse) async def home(): global running status = "Running" if running else "Stopped" return HTMLResponse(content=html_content.replace("{{ status }}", status), status_code=200) # Endpoint to start the email processing loop @app.post("/start") async def start_email_loop(): global running, loop_thread if not running: running = True # loop_thread = threading.Thread(target=email_processing_loop, daemon=True) # loop_thread.start() logging.info("Email processing loop started.") data = email_processing_loop() return "Running \n\n"+ data else: return "Already running \n\n"+data # Endpoint to stop the email processing loop @app.post("/stop") async def stop_email_loop(): global running if running: running = False logging.info("Email processing loop stopped.") return "Stopped" else: return "Already stopped" if __name__ == "__main__": logging.basicConfig(level=logging.INFO) logging.info("Starting FastAPI server...") import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)