Clawith/backend/app/services/email_service.py

437 lines
15 KiB
Python

"""Email service — IMAP/SMTP email operations for agent tools.
Supports all major email providers via preset configurations.
Each agent stores its own email credentials in per-agent tool config.
"""
import imaplib
import socket
import smtplib
import ssl
import email as email_lib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
from email.header import decode_header
from email.utils import parseaddr, formataddr, make_msgid
from datetime import datetime
from pathlib import Path
from typing import Optional
from app.core.email import force_ipv4, send_smtp_email
# Preset email provider configurations
EMAIL_PROVIDERS = {
"qq": {
"label": "QQ Mail",
"imap_host": "imap.qq.com",
"imap_port": 993,
"smtp_host": "smtp.qq.com",
"smtp_port": 465,
"smtp_ssl": True,
"help_url": "https://service.mail.qq.com/detail/0/310",
"help_text": "Settings → Account → POP3/IMAP/SMTP → Enable IMAP → Generate authorization code",
},
"163": {
"label": "163 Mail",
"imap_host": "imap.163.com",
"imap_port": 993,
"smtp_host": "smtp.163.com",
"smtp_port": 465,
"smtp_ssl": True,
"help_url": "https://help.mail.163.com/faqDetail.do?code=d7a5dc8471cd0c0e8b4b8f4f8e49998b374173cfe9171305fa1ce630d7f67ac2",
"help_text": "Settings → POP3/SMTP/IMAP → Enable IMAP → Set authorization code",
},
"gmail": {
"label": "Gmail",
"imap_host": "imap.gmail.com",
"imap_port": 993,
"smtp_host": "smtp.gmail.com",
"smtp_port": 465,
"smtp_ssl": True,
"help_url": "https://support.google.com/accounts/answer/185833",
"help_text": "Google Account → Security → App passwords → Generate app password",
},
"outlook": {
"label": "Outlook / Microsoft 365",
"imap_host": "outlook.office365.com",
"imap_port": 993,
"smtp_host": "smtp.office365.com",
"smtp_port": 587,
"smtp_ssl": False, # Uses STARTTLS
"help_url": "https://support.microsoft.com/en-us/account-billing/manage-app-passwords-for-two-step-verification-d6dc8c6d-4bf7-4851-ad95-6d07799387e9",
"help_text": "Microsoft Account → Security → App passwords",
},
"qq_enterprise": {
"label": "Tencent Enterprise Mail",
"imap_host": "imap.exmail.qq.com",
"imap_port": 993,
"smtp_host": "smtp.exmail.qq.com",
"smtp_port": 465,
"smtp_ssl": True,
"help_url": "https://open.work.weixin.qq.com/help2/pc/18624",
"help_text": "Enterprise Mail → Settings → Client-specific password → Generate new password",
},
"aliyun": {
"label": "Alibaba Enterprise Mail",
"imap_host": "imap.qiye.aliyun.com",
"imap_port": 993,
"smtp_host": "smtp.qiye.aliyun.com",
"smtp_port": 465,
"smtp_ssl": True,
"help_url": "",
"help_text": "Use your email password directly",
},
}
def resolve_config(config: dict) -> dict:
"""Resolve a user config into full IMAP/SMTP settings using provider presets."""
provider = config.get("email_provider", "custom")
result = {
"email_address": config.get("email_address", ""),
"auth_code": config.get("auth_code", ""),
"imap_host": config.get("imap_host", ""),
"imap_port": int(config.get("imap_port", 993)),
"smtp_host": config.get("smtp_host", ""),
"smtp_port": int(config.get("smtp_port", 465)),
"smtp_ssl": config.get("smtp_ssl", True),
}
if provider != "custom" and provider in EMAIL_PROVIDERS:
preset = EMAIL_PROVIDERS[provider]
result["imap_host"] = preset["imap_host"]
result["imap_port"] = preset["imap_port"]
result["smtp_host"] = preset["smtp_host"]
result["smtp_port"] = preset["smtp_port"]
result["smtp_ssl"] = preset["smtp_ssl"]
return result
def _decode_header_value(value: str) -> str:
"""Decode an email header value (handles encoded words)."""
if not value:
return ""
decoded_parts = decode_header(value)
result = []
for part, charset in decoded_parts:
if isinstance(part, bytes):
result.append(part.decode(charset or "utf-8", errors="replace"))
else:
result.append(part)
return "".join(result)
def _extract_body(msg) -> str:
"""Extract the plain text body from an email message."""
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition", ""))
if content_type == "text/plain" and "attachment" not in content_disposition:
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or "utf-8"
return payload.decode(charset, errors="replace")
# Fallback to HTML if no plain text
for part in msg.walk():
content_type = part.get_content_type()
if content_type == "text/html":
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or "utf-8"
return f"[HTML content]\n{payload.decode(charset, errors='replace')[:2000]}"
else:
payload = msg.get_payload(decode=True)
if payload:
charset = msg.get_content_charset() or "utf-8"
return payload.decode(charset, errors="replace")
return ""
async def send_email(
config: dict,
to: str,
subject: str,
body: str,
cc: Optional[str] = None,
attachments: Optional[list[str]] = None,
workspace_path: Optional[Path] = None,
) -> str:
"""Send an email via SMTP.
Args:
config: Resolved email config (from resolve_config)
to: Recipient email address(es), comma-separated
subject: Email subject
body: Email body text
cc: CC recipients, comma-separated
attachments: List of workspace-relative file paths to attach
workspace_path: Agent workspace root for resolving attachment paths
"""
cfg = resolve_config(config)
addr = cfg["email_address"]
password = cfg["auth_code"]
if not addr or not password:
return "❌ Email not configured. Please set email address and authorization code in tool config."
msg = MIMEMultipart()
msg["From"] = addr
msg["To"] = to
msg["Subject"] = subject
if cc:
msg["Cc"] = cc
msg["Message-ID"] = make_msgid()
msg["Date"] = datetime.now().strftime("%a, %d %b %Y %H:%M:%S %z")
msg.attach(MIMEText(body, "plain", "utf-8"))
# Attach files
if attachments and workspace_path:
for rel_path in attachments:
full_path = workspace_path / rel_path
if full_path.exists() and full_path.is_file():
with open(full_path, "rb") as f:
part = MIMEBase("application", "octet-stream")
part.set_payload(f.read())
encoders.encode_base64(part)
part.add_header("Content-Disposition", f"attachment; filename={full_path.name}")
msg.attach(part)
try:
recipients = [r.strip() for r in to.split(",")]
if cc:
recipients += [r.strip() for r in cc.split(",")]
send_smtp_email(
host=cfg["smtp_host"],
port=cfg["smtp_port"],
user=addr,
password=password,
from_addr=addr,
to_addrs=recipients,
msg_string=msg.as_string(),
use_ssl=cfg.get("smtp_ssl", True),
timeout=15,
)
return f"✅ Email sent to {to}" + (f" (CC: {cc})" if cc else "")
except smtplib.SMTPAuthenticationError:
return "❌ SMTP authentication failed. Please check your email address and authorization code."
except Exception as e:
return f"❌ Failed to send email: {str(e)[:200]}"
async def read_emails(
config: dict,
limit: int = 10,
search: Optional[str] = None,
folder: str = "INBOX",
) -> str:
"""Read emails from IMAP mailbox.
Args:
config: Resolved email config
limit: Max number of emails to return
search: Optional IMAP search criteria (e.g. 'FROM "john"', 'SUBJECT "hello"')
folder: Mailbox folder (default INBOX)
"""
cfg = resolve_config(config)
addr = cfg["email_address"]
password = cfg["auth_code"]
if not addr or not password:
return "❌ Email not configured. Please set email address and authorization code in tool config."
limit = min(limit, 30) # Cap at 30
try:
with force_ipv4():
context = ssl.create_default_context()
with imaplib.IMAP4_SSL(cfg["imap_host"], cfg["imap_port"], ssl_context=context) as mail:
mail.login(addr, password)
mail.select(folder, readonly=True)
# Search
if search:
_, msg_nums = mail.search(None, search)
else:
_, msg_nums = mail.search(None, "ALL")
msg_ids = msg_nums[0].split()
if not msg_ids:
return "📭 No emails found."
# Get latest N emails
latest_ids = msg_ids[-limit:]
latest_ids.reverse() # Newest first
results = []
for mid in latest_ids:
_, msg_data = mail.fetch(mid, "(RFC822)")
if not msg_data or not msg_data[0]:
continue
raw = msg_data[0][1]
msg = email_lib.message_from_bytes(raw)
from_addr = _decode_header_value(msg.get("From", ""))
subject = _decode_header_value(msg.get("Subject", "(No subject)"))
date_str = msg.get("Date", "")
message_id = msg.get("Message-ID", "")
body = _extract_body(msg)
# Truncate body for readability
if len(body) > 500:
body = body[:500] + "..."
results.append(
f"---\n"
f"**From:** {from_addr}\n"
f"**Subject:** {subject}\n"
f"**Date:** {date_str}\n"
f"**Message-ID:** {message_id}\n"
f"**Body:**\n{body}"
)
header = f"📬 {len(results)} email(s) from {folder}:\n\n"
return header + "\n\n".join(results)
except imaplib.IMAP4.error as e:
err = str(e)
if "LOGIN" in err.upper() or "AUTH" in err.upper():
return "❌ IMAP authentication failed. Please check your email address and authorization code."
return f"❌ IMAP error: {err[:200]}"
except Exception as e:
return f"❌ Failed to read emails: {str(e)[:200]}"
async def reply_email(
config: dict,
message_id: str,
body: str,
folder: str = "INBOX",
) -> str:
"""Reply to an email by Message-ID.
Args:
config: Resolved email config
message_id: Message-ID of the email to reply to
body: Reply body text
folder: Mailbox folder to search in
"""
cfg = resolve_config(config)
addr = cfg["email_address"]
password = cfg["auth_code"]
if not addr or not password:
return "❌ Email not configured."
try:
with force_ipv4():
# First, fetch the original email to get From/Subject
context = ssl.create_default_context()
original_from = ""
original_subject = ""
with imaplib.IMAP4_SSL(cfg["imap_host"], cfg["imap_port"], ssl_context=context) as mail:
mail.login(addr, password)
mail.select(folder, readonly=True)
_, msg_nums = mail.search(None, f'HEADER Message-ID "{message_id}"')
msg_ids = msg_nums[0].split()
if not msg_ids:
return f"❌ Original email not found with Message-ID: {message_id}"
_, msg_data = mail.fetch(msg_ids[0], "(RFC822)")
raw = msg_data[0][1]
original = email_lib.message_from_bytes(raw)
original_from = original.get("From", "")
original_subject = _decode_header_value(original.get("Subject", ""))
# Build reply
reply_subject = original_subject if original_subject.lower().startswith("re:") else f"Re: {original_subject}"
reply_msg = MIMEMultipart()
reply_msg["From"] = addr
reply_msg["To"] = parseaddr(original_from)[1] or original_from
reply_msg["Subject"] = reply_subject
reply_msg["In-Reply-To"] = message_id
reply_msg["References"] = message_id
reply_msg["Message-ID"] = make_msgid()
reply_msg.attach(MIMEText(body, "plain", "utf-8"))
# Send
send_smtp_email(
host=cfg["smtp_host"],
port=cfg["smtp_port"],
user=addr,
password=password,
from_addr=addr,
to_addrs=[reply_msg["To"]],
msg_string=reply_msg.as_string(),
use_ssl=cfg.get("smtp_ssl", True),
timeout=15,
)
return f"✅ Reply sent to {reply_msg['To']} (Subject: {reply_subject})"
except Exception as e:
return f"❌ Failed to reply: {str(e)[:200]}"
async def test_connection(config: dict) -> dict:
"""Test IMAP and SMTP connections.
Returns dict with 'ok' (bool), 'imap' (str), 'smtp' (str) status messages.
"""
cfg = resolve_config(config)
addr = cfg["email_address"]
password = cfg["auth_code"]
if not addr or not password:
return {"ok": False, "error": "Email address and authorization code are required."}
result = {"ok": True, "imap": "", "smtp": ""}
# Test IMAP
try:
with force_ipv4():
context = ssl.create_default_context()
with imaplib.IMAP4_SSL(cfg["imap_host"], cfg["imap_port"], ssl_context=context) as mail:
mail.login(addr, password)
mail.select("INBOX", readonly=True)
_, msg_nums = mail.search(None, "ALL")
count = len(msg_nums[0].split()) if msg_nums[0] else 0
result["imap"] = f"✅ IMAP connected ({count} emails in INBOX)"
except imaplib.IMAP4.error as e:
result["ok"] = False
result["imap"] = f"❌ IMAP failed: {str(e)[:150]}"
except Exception as e:
result["ok"] = False
result["imap"] = f"❌ IMAP error: {str(e)[:150]}"
# Test SMTP
try:
send_smtp_email(
host=cfg["smtp_host"],
port=cfg["smtp_port"],
user=addr,
password=password,
from_addr=addr,
to_addrs=[addr], # Send to self for test
msg_string="Subject: Clawith Connection Test\n\nSMTP Connection Successful.",
use_ssl=cfg.get("smtp_ssl", True),
timeout=10,
)
result["smtp"] = "✅ SMTP connected"
except smtplib.SMTPAuthenticationError:
result["ok"] = False
result["smtp"] = "❌ SMTP authentication failed"
except Exception as e:
result["ok"] = False
result["smtp"] = f"❌ SMTP error: {str(e)[:150]}"
return result