|
import streamlit as st |
|
import imaplib |
|
import email |
|
from email.header import decode_header |
|
import torch |
|
from transformers import BertTokenizer, BertForSequenceClassification |
|
import re |
|
|
|
class EmailProcessor: |
|
@staticmethod |
|
def decode_email_content(content, default_charset='utf-8'): |
|
if isinstance(content, bytes): |
|
try: |
|
return content.decode(default_charset) |
|
except UnicodeDecodeError: |
|
try: |
|
return content.decode('iso-8859-1') |
|
except UnicodeDecodeError: |
|
return content.decode(default_charset, errors='ignore') |
|
return str(content) |
|
|
|
@staticmethod |
|
def clean_text(text): |
|
text = re.sub(r'<[^>]+>', '', text) |
|
text = re.sub(r'\s+', ' ', text) |
|
return text.strip() |
|
|
|
@staticmethod |
|
def get_emails(email_address, password, imap_server, imap_port): |
|
try: |
|
imap = imaplib.IMAP4_SSL(imap_server, imap_port) |
|
imap.login(email_address, password) |
|
imap.select('INBOX') |
|
|
|
_, message_numbers = imap.search(None, 'ALL') |
|
|
|
emails = [] |
|
for num in message_numbers[0].split()[-5:]: |
|
_, msg_data = imap.fetch(num, '(RFC822)') |
|
email_body = msg_data[0][1] |
|
message = email.message_from_bytes(email_body) |
|
|
|
subject = decode_header(message["subject"])[0][0] |
|
if isinstance(subject, bytes): |
|
subject = EmailProcessor.decode_email_content(subject) |
|
|
|
if message.is_multipart(): |
|
content = '' |
|
for part in message.walk(): |
|
if part.get_content_type() == "text/plain": |
|
payload = part.get_payload(decode=True) |
|
if payload: |
|
charset = part.get_content_charset() or 'utf-8' |
|
content += EmailProcessor.decode_email_content(payload, charset) |
|
else: |
|
payload = message.get_payload(decode=True) |
|
if payload: |
|
charset = message.get_content_charset() or 'utf-8' |
|
content = EmailProcessor.decode_email_content(payload, charset) |
|
else: |
|
content = "" |
|
|
|
emails.append({ |
|
'subject': subject, |
|
'content': EmailProcessor.clean_text(content) |
|
}) |
|
|
|
imap.close() |
|
imap.logout() |
|
return emails, None |
|
|
|
except Exception as e: |
|
return None, str(e) |
|
|
|
class PhishingDetector: |
|
def __init__(self, model_path="./phishing_model"): |
|
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
self.tokenizer = BertTokenizer.from_pretrained(model_path) |
|
self.model = BertForSequenceClassification.from_pretrained( |
|
model_path, |
|
num_labels=2 |
|
).to(self.device) |
|
self.model.eval() |
|
|
|
@torch.no_grad() |
|
def predict(self, text): |
|
cleaned_text = EmailProcessor.clean_text(text) |
|
inputs = self.tokenizer( |
|
cleaned_text, |
|
return_tensors="pt", |
|
truncation=True, |
|
max_length=512, |
|
padding=True |
|
) |
|
|
|
inputs = {k: v.to(self.device) for k, v in inputs.items()} |
|
outputs = self.model(**inputs) |
|
probabilities = torch.nn.functional.softmax(outputs.logits, dim=1) |
|
return probabilities[0][1].item() |
|
|
|
|
|
st.title("π§ Email Phishing Detector") |
|
st.write("Connect your email account to analyze messages for potential phishing attempts.") |
|
|
|
|
|
with st.sidebar: |
|
st.header("Email Settings") |
|
email_address = st.text_input("Email Address", key="email_address_input") |
|
password = st.text_input("Password", type="password", key="password_input") |
|
imap_server = st.text_input("IMAP Server", value="imap.gmail.com", key="imap_server_input") |
|
imap_port = st.number_input("IMAP Port", value=993, key="imap_port_input") |
|
|
|
|
|
@st.cache_resource |
|
def load_detector(): |
|
return PhishingDetector() |
|
|
|
try: |
|
detector = load_detector() |
|
model_loaded = True |
|
except Exception as e: |
|
st.error(f"Error loading model: {str(e)}") |
|
model_loaded = False |
|
|
|
|
|
st.markdown("### π Manual Text Analysis") |
|
manual_text = st.text_area("Enter text to analyze:", height=100, key="manual_text_input") |
|
if st.button("Analyze Text", key="analyze_text_btn") and manual_text.strip(): |
|
with st.spinner("Analyzing text..."): |
|
phishing_score = detector.predict(manual_text) |
|
risk_color = "red" if phishing_score > 0.5 else "green" |
|
st.markdown(f"**Phishing Risk Score:** <span style='color:{risk_color}'>{phishing_score:.2%}</span>", unsafe_allow_html=True) |
|
|
|
if phishing_score > 0.8: |
|
st.error("β οΈ High Risk: This text shows strong indicators of being a phishing attempt!") |
|
elif phishing_score > 0.5: |
|
st.warning("β οΈ Medium Risk: This text shows some suspicious characteristics.") |
|
else: |
|
st.success("β
Low Risk: This text appears to be legitimate.") |
|
|
|
st.markdown("### π¨ Email Analysis") |
|
if model_loaded and st.button("Analyze Emails", key="analyze_emails_btn"): |
|
if not email_address or not password: |
|
st.warning("Please enter your email credentials.") |
|
else: |
|
with st.spinner("Connecting to email..."): |
|
emails, error = EmailProcessor.get_emails(email_address, password, imap_server, imap_port) |
|
|
|
if error: |
|
st.error(f"Error connecting to email: {error}") |
|
elif emails: |
|
st.success("Successfully retrieved emails!") |
|
|
|
for i, email_data in enumerate(emails): |
|
with st.expander(f"Email {i+1}: {email_data['subject']}"): |
|
phishing_score = detector.predict(email_data['content']) |
|
|
|
risk_color = "red" if phishing_score > 0.5 else "green" |
|
st.markdown(f"**Phishing Risk Score:** <span style='color:{risk_color}'>{phishing_score:.2%}</span>", unsafe_allow_html=True) |
|
|
|
if phishing_score > 0.8: |
|
st.error("β οΈ High Risk: This email shows strong indicators of being a phishing attempt!") |
|
elif phishing_score > 0.5: |
|
st.warning("β οΈ Medium Risk: This email shows some suspicious characteristics.") |
|
else: |
|
st.success("β
Low Risk: This email appears to be legitimate.") |
|
|
|
st.text_area("Email Content", email_data['content'], height=100, key=f"email_content_{i}") |
|
else: |
|
st.warning("No emails found in inbox.") |
|
|
|
st.sidebar.markdown("---") |
|
st.sidebar.markdown(""" |
|
### Instructions |
|
1. Enter your email credentials |
|
2. For Gmail: |
|
- Use an App Password instead of your regular password |
|
- Enable 2FA and generate an App Password from Google Account settings |
|
3. Click "Analyze Emails" to scan your recent emails |
|
""") |
|
|
|
st.sidebar.markdown("---") |
|
st.sidebar.markdown(""" |
|
### About |
|
This application uses a BERT-based model to detect phishing attempts in emails. |
|
You can either: |
|
1. Analyze your emails directly by connecting your email account |
|
2. Manually input text to analyze for phishing content |
|
""") |
|
|