437 lines
15 KiB
Python
437 lines
15 KiB
Python
"""Email service — IMAP/SMTP email operations for agent tools.
|
|
|
|
Supports all major email providers via preset configurations.
|
|
Each agent stores its own email credentials in per-agent tool config.
|
|
"""
|
|
|
|
import imaplib
|
|
import socket
|
|
import smtplib
|
|
import ssl
|
|
import email as email_lib
|
|
from email.mime.text import MIMEText
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.base import MIMEBase
|
|
from email import encoders
|
|
from email.header import decode_header
|
|
from email.utils import parseaddr, formataddr, make_msgid
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from app.core.email import force_ipv4, send_smtp_email
|
|
|
|
# Preset email provider configurations
|
|
EMAIL_PROVIDERS = {
|
|
"qq": {
|
|
"label": "QQ Mail",
|
|
"imap_host": "imap.qq.com",
|
|
"imap_port": 993,
|
|
"smtp_host": "smtp.qq.com",
|
|
"smtp_port": 465,
|
|
"smtp_ssl": True,
|
|
"help_url": "https://service.mail.qq.com/detail/0/310",
|
|
"help_text": "Settings → Account → POP3/IMAP/SMTP → Enable IMAP → Generate authorization code",
|
|
},
|
|
"163": {
|
|
"label": "163 Mail",
|
|
"imap_host": "imap.163.com",
|
|
"imap_port": 993,
|
|
"smtp_host": "smtp.163.com",
|
|
"smtp_port": 465,
|
|
"smtp_ssl": True,
|
|
"help_url": "https://help.mail.163.com/faqDetail.do?code=d7a5dc8471cd0c0e8b4b8f4f8e49998b374173cfe9171305fa1ce630d7f67ac2",
|
|
"help_text": "Settings → POP3/SMTP/IMAP → Enable IMAP → Set authorization code",
|
|
},
|
|
"gmail": {
|
|
"label": "Gmail",
|
|
"imap_host": "imap.gmail.com",
|
|
"imap_port": 993,
|
|
"smtp_host": "smtp.gmail.com",
|
|
"smtp_port": 465,
|
|
"smtp_ssl": True,
|
|
"help_url": "https://support.google.com/accounts/answer/185833",
|
|
"help_text": "Google Account → Security → App passwords → Generate app password",
|
|
},
|
|
"outlook": {
|
|
"label": "Outlook / Microsoft 365",
|
|
"imap_host": "outlook.office365.com",
|
|
"imap_port": 993,
|
|
"smtp_host": "smtp.office365.com",
|
|
"smtp_port": 587,
|
|
"smtp_ssl": False, # Uses STARTTLS
|
|
"help_url": "https://support.microsoft.com/en-us/account-billing/manage-app-passwords-for-two-step-verification-d6dc8c6d-4bf7-4851-ad95-6d07799387e9",
|
|
"help_text": "Microsoft Account → Security → App passwords",
|
|
},
|
|
"qq_enterprise": {
|
|
"label": "Tencent Enterprise Mail",
|
|
"imap_host": "imap.exmail.qq.com",
|
|
"imap_port": 993,
|
|
"smtp_host": "smtp.exmail.qq.com",
|
|
"smtp_port": 465,
|
|
"smtp_ssl": True,
|
|
"help_url": "https://open.work.weixin.qq.com/help2/pc/18624",
|
|
"help_text": "Enterprise Mail → Settings → Client-specific password → Generate new password",
|
|
},
|
|
"aliyun": {
|
|
"label": "Alibaba Enterprise Mail",
|
|
"imap_host": "imap.qiye.aliyun.com",
|
|
"imap_port": 993,
|
|
"smtp_host": "smtp.qiye.aliyun.com",
|
|
"smtp_port": 465,
|
|
"smtp_ssl": True,
|
|
"help_url": "",
|
|
"help_text": "Use your email password directly",
|
|
},
|
|
}
|
|
|
|
|
|
def resolve_config(config: dict) -> dict:
|
|
"""Resolve a user config into full IMAP/SMTP settings using provider presets."""
|
|
provider = config.get("email_provider", "custom")
|
|
result = {
|
|
"email_address": config.get("email_address", ""),
|
|
"auth_code": config.get("auth_code", ""),
|
|
"imap_host": config.get("imap_host", ""),
|
|
"imap_port": int(config.get("imap_port", 993)),
|
|
"smtp_host": config.get("smtp_host", ""),
|
|
"smtp_port": int(config.get("smtp_port", 465)),
|
|
"smtp_ssl": config.get("smtp_ssl", True),
|
|
}
|
|
|
|
if provider != "custom" and provider in EMAIL_PROVIDERS:
|
|
preset = EMAIL_PROVIDERS[provider]
|
|
result["imap_host"] = preset["imap_host"]
|
|
result["imap_port"] = preset["imap_port"]
|
|
result["smtp_host"] = preset["smtp_host"]
|
|
result["smtp_port"] = preset["smtp_port"]
|
|
result["smtp_ssl"] = preset["smtp_ssl"]
|
|
|
|
return result
|
|
|
|
|
|
def _decode_header_value(value: str) -> str:
|
|
"""Decode an email header value (handles encoded words)."""
|
|
if not value:
|
|
return ""
|
|
decoded_parts = decode_header(value)
|
|
result = []
|
|
for part, charset in decoded_parts:
|
|
if isinstance(part, bytes):
|
|
result.append(part.decode(charset or "utf-8", errors="replace"))
|
|
else:
|
|
result.append(part)
|
|
return "".join(result)
|
|
|
|
|
|
def _extract_body(msg) -> str:
|
|
"""Extract the plain text body from an email message."""
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
content_type = part.get_content_type()
|
|
content_disposition = str(part.get("Content-Disposition", ""))
|
|
if content_type == "text/plain" and "attachment" not in content_disposition:
|
|
payload = part.get_payload(decode=True)
|
|
if payload:
|
|
charset = part.get_content_charset() or "utf-8"
|
|
return payload.decode(charset, errors="replace")
|
|
# Fallback to HTML if no plain text
|
|
for part in msg.walk():
|
|
content_type = part.get_content_type()
|
|
if content_type == "text/html":
|
|
payload = part.get_payload(decode=True)
|
|
if payload:
|
|
charset = part.get_content_charset() or "utf-8"
|
|
return f"[HTML content]\n{payload.decode(charset, errors='replace')[:2000]}"
|
|
else:
|
|
payload = msg.get_payload(decode=True)
|
|
if payload:
|
|
charset = msg.get_content_charset() or "utf-8"
|
|
return payload.decode(charset, errors="replace")
|
|
return ""
|
|
|
|
|
|
async def send_email(
|
|
config: dict,
|
|
to: str,
|
|
subject: str,
|
|
body: str,
|
|
cc: Optional[str] = None,
|
|
attachments: Optional[list[str]] = None,
|
|
workspace_path: Optional[Path] = None,
|
|
) -> str:
|
|
"""Send an email via SMTP.
|
|
|
|
Args:
|
|
config: Resolved email config (from resolve_config)
|
|
to: Recipient email address(es), comma-separated
|
|
subject: Email subject
|
|
body: Email body text
|
|
cc: CC recipients, comma-separated
|
|
attachments: List of workspace-relative file paths to attach
|
|
workspace_path: Agent workspace root for resolving attachment paths
|
|
"""
|
|
cfg = resolve_config(config)
|
|
addr = cfg["email_address"]
|
|
password = cfg["auth_code"]
|
|
|
|
if not addr or not password:
|
|
return "❌ Email not configured. Please set email address and authorization code in tool config."
|
|
|
|
msg = MIMEMultipart()
|
|
msg["From"] = addr
|
|
msg["To"] = to
|
|
msg["Subject"] = subject
|
|
if cc:
|
|
msg["Cc"] = cc
|
|
msg["Message-ID"] = make_msgid()
|
|
msg["Date"] = datetime.now().strftime("%a, %d %b %Y %H:%M:%S %z")
|
|
|
|
msg.attach(MIMEText(body, "plain", "utf-8"))
|
|
|
|
# Attach files
|
|
if attachments and workspace_path:
|
|
for rel_path in attachments:
|
|
full_path = workspace_path / rel_path
|
|
if full_path.exists() and full_path.is_file():
|
|
with open(full_path, "rb") as f:
|
|
part = MIMEBase("application", "octet-stream")
|
|
part.set_payload(f.read())
|
|
encoders.encode_base64(part)
|
|
part.add_header("Content-Disposition", f"attachment; filename={full_path.name}")
|
|
msg.attach(part)
|
|
|
|
try:
|
|
recipients = [r.strip() for r in to.split(",")]
|
|
if cc:
|
|
recipients += [r.strip() for r in cc.split(",")]
|
|
|
|
send_smtp_email(
|
|
host=cfg["smtp_host"],
|
|
port=cfg["smtp_port"],
|
|
user=addr,
|
|
password=password,
|
|
from_addr=addr,
|
|
to_addrs=recipients,
|
|
msg_string=msg.as_string(),
|
|
use_ssl=cfg.get("smtp_ssl", True),
|
|
timeout=15,
|
|
)
|
|
|
|
return f"✅ Email sent to {to}" + (f" (CC: {cc})" if cc else "")
|
|
except smtplib.SMTPAuthenticationError:
|
|
return "❌ SMTP authentication failed. Please check your email address and authorization code."
|
|
except Exception as e:
|
|
return f"❌ Failed to send email: {str(e)[:200]}"
|
|
|
|
|
|
async def read_emails(
|
|
config: dict,
|
|
limit: int = 10,
|
|
search: Optional[str] = None,
|
|
folder: str = "INBOX",
|
|
) -> str:
|
|
"""Read emails from IMAP mailbox.
|
|
|
|
Args:
|
|
config: Resolved email config
|
|
limit: Max number of emails to return
|
|
search: Optional IMAP search criteria (e.g. 'FROM "john"', 'SUBJECT "hello"')
|
|
folder: Mailbox folder (default INBOX)
|
|
"""
|
|
cfg = resolve_config(config)
|
|
addr = cfg["email_address"]
|
|
password = cfg["auth_code"]
|
|
|
|
if not addr or not password:
|
|
return "❌ Email not configured. Please set email address and authorization code in tool config."
|
|
|
|
limit = min(limit, 30) # Cap at 30
|
|
|
|
try:
|
|
with force_ipv4():
|
|
context = ssl.create_default_context()
|
|
with imaplib.IMAP4_SSL(cfg["imap_host"], cfg["imap_port"], ssl_context=context) as mail:
|
|
mail.login(addr, password)
|
|
mail.select(folder, readonly=True)
|
|
|
|
# Search
|
|
if search:
|
|
_, msg_nums = mail.search(None, search)
|
|
else:
|
|
_, msg_nums = mail.search(None, "ALL")
|
|
|
|
msg_ids = msg_nums[0].split()
|
|
if not msg_ids:
|
|
return "📭 No emails found."
|
|
|
|
# Get latest N emails
|
|
latest_ids = msg_ids[-limit:]
|
|
latest_ids.reverse() # Newest first
|
|
|
|
results = []
|
|
for mid in latest_ids:
|
|
_, msg_data = mail.fetch(mid, "(RFC822)")
|
|
if not msg_data or not msg_data[0]:
|
|
continue
|
|
raw = msg_data[0][1]
|
|
msg = email_lib.message_from_bytes(raw)
|
|
|
|
from_addr = _decode_header_value(msg.get("From", ""))
|
|
subject = _decode_header_value(msg.get("Subject", "(No subject)"))
|
|
date_str = msg.get("Date", "")
|
|
message_id = msg.get("Message-ID", "")
|
|
body = _extract_body(msg)
|
|
# Truncate body for readability
|
|
if len(body) > 500:
|
|
body = body[:500] + "..."
|
|
|
|
results.append(
|
|
f"---\n"
|
|
f"**From:** {from_addr}\n"
|
|
f"**Subject:** {subject}\n"
|
|
f"**Date:** {date_str}\n"
|
|
f"**Message-ID:** {message_id}\n"
|
|
f"**Body:**\n{body}"
|
|
)
|
|
|
|
header = f"📬 {len(results)} email(s) from {folder}:\n\n"
|
|
return header + "\n\n".join(results)
|
|
|
|
except imaplib.IMAP4.error as e:
|
|
err = str(e)
|
|
if "LOGIN" in err.upper() or "AUTH" in err.upper():
|
|
return "❌ IMAP authentication failed. Please check your email address and authorization code."
|
|
return f"❌ IMAP error: {err[:200]}"
|
|
except Exception as e:
|
|
return f"❌ Failed to read emails: {str(e)[:200]}"
|
|
|
|
|
|
async def reply_email(
|
|
config: dict,
|
|
message_id: str,
|
|
body: str,
|
|
folder: str = "INBOX",
|
|
) -> str:
|
|
"""Reply to an email by Message-ID.
|
|
|
|
Args:
|
|
config: Resolved email config
|
|
message_id: Message-ID of the email to reply to
|
|
body: Reply body text
|
|
folder: Mailbox folder to search in
|
|
"""
|
|
cfg = resolve_config(config)
|
|
addr = cfg["email_address"]
|
|
password = cfg["auth_code"]
|
|
|
|
if not addr or not password:
|
|
return "❌ Email not configured."
|
|
|
|
try:
|
|
with force_ipv4():
|
|
# First, fetch the original email to get From/Subject
|
|
context = ssl.create_default_context()
|
|
original_from = ""
|
|
original_subject = ""
|
|
|
|
with imaplib.IMAP4_SSL(cfg["imap_host"], cfg["imap_port"], ssl_context=context) as mail:
|
|
mail.login(addr, password)
|
|
mail.select(folder, readonly=True)
|
|
_, msg_nums = mail.search(None, f'HEADER Message-ID "{message_id}"')
|
|
msg_ids = msg_nums[0].split()
|
|
if not msg_ids:
|
|
return f"❌ Original email not found with Message-ID: {message_id}"
|
|
|
|
_, msg_data = mail.fetch(msg_ids[0], "(RFC822)")
|
|
raw = msg_data[0][1]
|
|
original = email_lib.message_from_bytes(raw)
|
|
original_from = original.get("From", "")
|
|
original_subject = _decode_header_value(original.get("Subject", ""))
|
|
|
|
# Build reply
|
|
reply_subject = original_subject if original_subject.lower().startswith("re:") else f"Re: {original_subject}"
|
|
|
|
reply_msg = MIMEMultipart()
|
|
reply_msg["From"] = addr
|
|
reply_msg["To"] = parseaddr(original_from)[1] or original_from
|
|
reply_msg["Subject"] = reply_subject
|
|
reply_msg["In-Reply-To"] = message_id
|
|
reply_msg["References"] = message_id
|
|
reply_msg["Message-ID"] = make_msgid()
|
|
|
|
reply_msg.attach(MIMEText(body, "plain", "utf-8"))
|
|
|
|
# Send
|
|
send_smtp_email(
|
|
host=cfg["smtp_host"],
|
|
port=cfg["smtp_port"],
|
|
user=addr,
|
|
password=password,
|
|
from_addr=addr,
|
|
to_addrs=[reply_msg["To"]],
|
|
msg_string=reply_msg.as_string(),
|
|
use_ssl=cfg.get("smtp_ssl", True),
|
|
timeout=15,
|
|
)
|
|
|
|
return f"✅ Reply sent to {reply_msg['To']} (Subject: {reply_subject})"
|
|
|
|
except Exception as e:
|
|
return f"❌ Failed to reply: {str(e)[:200]}"
|
|
|
|
|
|
async def test_connection(config: dict) -> dict:
|
|
"""Test IMAP and SMTP connections.
|
|
|
|
Returns dict with 'ok' (bool), 'imap' (str), 'smtp' (str) status messages.
|
|
"""
|
|
cfg = resolve_config(config)
|
|
addr = cfg["email_address"]
|
|
password = cfg["auth_code"]
|
|
|
|
if not addr or not password:
|
|
return {"ok": False, "error": "Email address and authorization code are required."}
|
|
|
|
result = {"ok": True, "imap": "", "smtp": ""}
|
|
|
|
# Test IMAP
|
|
try:
|
|
with force_ipv4():
|
|
context = ssl.create_default_context()
|
|
with imaplib.IMAP4_SSL(cfg["imap_host"], cfg["imap_port"], ssl_context=context) as mail:
|
|
mail.login(addr, password)
|
|
mail.select("INBOX", readonly=True)
|
|
_, msg_nums = mail.search(None, "ALL")
|
|
count = len(msg_nums[0].split()) if msg_nums[0] else 0
|
|
result["imap"] = f"✅ IMAP connected ({count} emails in INBOX)"
|
|
except imaplib.IMAP4.error as e:
|
|
result["ok"] = False
|
|
result["imap"] = f"❌ IMAP failed: {str(e)[:150]}"
|
|
except Exception as e:
|
|
result["ok"] = False
|
|
result["imap"] = f"❌ IMAP error: {str(e)[:150]}"
|
|
|
|
# Test SMTP
|
|
try:
|
|
send_smtp_email(
|
|
host=cfg["smtp_host"],
|
|
port=cfg["smtp_port"],
|
|
user=addr,
|
|
password=password,
|
|
from_addr=addr,
|
|
to_addrs=[addr], # Send to self for test
|
|
msg_string="Subject: Clawith Connection Test\n\nSMTP Connection Successful.",
|
|
use_ssl=cfg.get("smtp_ssl", True),
|
|
timeout=10,
|
|
)
|
|
result["smtp"] = "✅ SMTP connected"
|
|
except smtplib.SMTPAuthenticationError:
|
|
result["ok"] = False
|
|
result["smtp"] = "❌ SMTP authentication failed"
|
|
except Exception as e:
|
|
result["ok"] = False
|
|
result["smtp"] = f"❌ SMTP error: {str(e)[:150]}"
|
|
|
|
return result
|