import streamlit as st from googleapiclient.discovery import build from google_auth_oauthlib.flow import InstalledAppFlow from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials import os import json import pandas as pd import base64 import io import sqlite3 from fpdf import FPDF SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'] # Initialize session state variables if "authenticated" not in st.session_state: st.session_state.authenticated = False if "creds" not in st.session_state: st.session_state.creds = None if "auth_url" not in st.session_state: st.session_state.auth_url = None if "auth_code" not in st.session_state: st.session_state.auth_code = "" if "flow" not in st.session_state: st.session_state.flow = None # Authenticate Gmail API def authenticate_gmail(credentials_file, user_email, app_password): if os.path.exists('token.json'): try: creds = Credentials.from_authorized_user_file('token.json', SCOPES) if creds and creds.valid: st.session_state.creds = creds st.session_state.authenticated = True st.success("Authentication successful!") return creds except Exception as e: st.error(f"Invalid token.json file: {e}") os.remove('token.json') if not st.session_state.creds or not st.session_state.creds.valid: if st.session_state.creds and st.session_state.creds.expired and st.session_state.creds.refresh_token: st.session_state.creds.refresh(Request()) st.session_state.authenticated = True st.success("Authentication successful!") return st.session_state.creds else: if not st.session_state.flow: st.session_state.flow = InstalledAppFlow.from_client_secrets_file(credentials_file, SCOPES) st.session_state.flow.redirect_uri = 'http://localhost' auth_url, _ = st.session_state.flow.authorization_url(prompt='consent') st.session_state.auth_url = auth_url st.info("Please visit this URL to authorize the application:") st.code(st.session_state.auth_url) # Submit Authentication Code def submit_auth_code(): try: st.session_state.flow.fetch_token(code=st.session_state.auth_code) st.session_state.creds = st.session_state.flow.credentials st.session_state.authenticated = True with open('token.json', 'w') as token_file: json.dump({ "token": st.session_state.creds.token, "refresh_token": st.session_state.creds.refresh_token, "token_uri": st.session_state.creds.token_uri, "client_id": st.session_state.creds.client_id, "client_secret": st.session_state.creds.client_secret, "scopes": st.session_state.creds.scopes }, token_file) st.success("Authentication successful!") except Exception as e: st.error(f"Error during authentication: {e}") # Fetch Emails from Gmail API def fetch_emails(service, label): emails = [] total_fetched = 0 next_page_token = None while True: results = service.users().messages().list( userId='me', labelIds=[label], maxResults=500, pageToken=next_page_token ).execute() messages = results.get('messages', []) for message in messages: msg = service.users().messages().get(userId='me', id=message['id'], format='full').execute() headers = {header['name']: header['value'] for header in msg['payload']['headers']} body = '' if 'parts' in msg['payload']: for part in msg['payload']['parts']: if part['mimeType'] == 'text/plain': body = base64.urlsafe_b64decode(part['body'].get('data', '').encode('UTF-8')).decode('UTF-8') email_data = { "Date": headers.get('Date', ''), "From": headers.get('From', ''), "To": headers.get('To', ''), "Subject": headers.get('Subject', ''), "Body": body, } emails.append(email_data) total_fetched += len(messages) st.info(f"Total emails fetched: {total_fetched}") next_page_token = results.get('nextPageToken') if not next_page_token: break st.success(f"Fetched {total_fetched} emails from {label}.") return emails # Export to PDF def export_to_pdf(df): pdf = FPDF() pdf.set_auto_page_break(auto=True, margin=15) pdf.add_page() pdf.set_font("Arial", size=12) for i, row in df.iterrows(): pdf.cell(200, 10, txt=f"Date: {row['Date']} | From: {row['From']} | To: {row['To']} | Subject: {row['Subject']}", ln=True) pdf.multi_cell(200, 10, txt=f"Body: {row['Body']}") pdf.ln() pdf_buffer = io.BytesIO() pdf.output(pdf_buffer) pdf_buffer.seek(0) return pdf_buffer # Main Page st.title("Gmail Email Fetcher") # Navigation Slider st.sidebar.title("Navigation") st.sidebar.markdown(""" 1. **Upload credentials.json**: Provide the credentials file for Gmail API. 2. **Authenticate**: Use your Gmail email and app password to authenticate. 3. **Fetch Emails**: Select the label and start fetching emails. 4. **Download**: Choose a format to download fetched emails. """) user_email = st.text_input("Enter your Gmail email") app_password = st.text_input("Enter your App Password", type="password") credentials_file = st.file_uploader("Upload credentials.json", type="json") if credentials_file and st.button("Authenticate"): with open("credentials.json", "wb") as f: f.write(credentials_file.getbuffer()) authenticate_gmail("credentials.json", user_email, app_password) if st.session_state.auth_url: st.text_input("Enter the authorization code:", key="auth_code") if st.button("Submit Authentication Code"): submit_auth_code() if st.session_state.authenticated: st.success("You are authenticated!") service = build('gmail', 'v1', credentials=st.session_state.creds) label = st.selectbox("Select Label", ["INBOX", "SENT", "DRAFTS", "TRASH", "SPAM"], key="label_selector") if st.button("Fetch Emails"): st.info("Fetching emails... This may take a while.") emails = fetch_emails(service, label) df = pd.DataFrame(emails) st.write("### Download Options:") col1, col2, col3, col4 = st.columns(4) with col1: csv = df.to_csv(index=False).encode('utf-8') st.download_button("Download as CSV", csv, f"{label}_emails.csv", "text/csv") with col2: excel_buffer = io.BytesIO() with pd.ExcelWriter(excel_buffer, engine='openpyxl') as writer: df.to_excel(writer, index=False) excel_buffer.seek(0) st.download_button("Download as Excel", excel_buffer, f"{label}_emails.xlsx", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet") with col3: pdf_buffer = export_to_pdf(df) st.download_button("Download as PDF", pdf_buffer, f"{label}_emails.pdf", "application/pdf") with col4: conn = sqlite3.connect(':memory:') df.to_sql('emails', conn, index=False, if_exists='replace') sql_buffer = io.BytesIO() with open('emails.db', 'wb') as f: for line in conn.iterdump(): f.write(f'{line}\n'.encode('utf-8')) sql_buffer.seek(0) st.download_button("Download as SQL", sql_buffer, "emails.db", "application/octet-stream") else: st.warning("You are not authenticated yet.")