114 lines
4.0 KiB
Python
114 lines
4.0 KiB
Python
"""Email verification token lifecycle helpers."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import secrets
|
|
import uuid
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from app.config import get_settings
|
|
from app.core.events import get_redis
|
|
|
|
# Key prefixes for Redis
|
|
TOKEN_PREFIX = "email_verify:token:"
|
|
USER_PREFIX = "email_verify:user:"
|
|
|
|
|
|
class EmailVerificationService:
|
|
"""Email verification token lifecycle helpers."""
|
|
|
|
def _hash_token(self, token: str) -> str:
|
|
"""Hash a raw verification token before persistence or lookup."""
|
|
import hashlib
|
|
return hashlib.sha256(token.encode("utf-8")).hexdigest()
|
|
|
|
async def create_email_verification_token(self, identity_id: uuid.UUID, email: str) -> tuple[str, datetime]:
|
|
"""Create a new 6-digit email verification code and store in Redis."""
|
|
redis = await get_redis()
|
|
user_key = f"{USER_PREFIX}{identity_id}"
|
|
|
|
# Invalidate previous code for this user if exists
|
|
old_code_hash = await redis.get(user_key)
|
|
if old_code_hash:
|
|
await redis.delete(f"{TOKEN_PREFIX}{old_code_hash}")
|
|
|
|
# Generate a random 6-digit code
|
|
import secrets
|
|
raw_code = "".join([str(secrets.randbelow(10)) for _ in range(6)])
|
|
code_hash = self._hash_token(raw_code)
|
|
|
|
now = datetime.now(timezone.utc)
|
|
expiry_minutes = get_settings().EMAIL_VERIFICATION_TOKEN_EXPIRE_MINUTES
|
|
expires_at = now + timedelta(minutes=expiry_minutes)
|
|
|
|
# Store the new code with user_id and email
|
|
token_key = f"{TOKEN_PREFIX}{code_hash}"
|
|
ttl_seconds = int(expiry_minutes * 60)
|
|
|
|
# Store as JSON with identity_id and email
|
|
import json
|
|
token_data = json.dumps({"identity_id": str(identity_id), "email": email})
|
|
|
|
async with redis.pipeline(transaction=True) as pipe:
|
|
pipe.setex(token_key, ttl_seconds, token_data)
|
|
pipe.setex(user_key, ttl_seconds, code_hash)
|
|
await pipe.execute()
|
|
|
|
return raw_code, expires_at
|
|
|
|
async def build_email_verification_url(self, base_url: str, raw_token: str) -> str:
|
|
"""Build the user-facing verification URL. Note: now uses 6-digit code."""
|
|
base = base_url.strip().rstrip("/")
|
|
return f"{base}/verify-email?code={raw_token}"
|
|
|
|
async def consume_email_verification_token(self, raw_token: str) -> dict | None:
|
|
"""Load a valid verification code from Redis and mark it used (by deleting)."""
|
|
import json
|
|
|
|
redis = await get_redis()
|
|
token_hash = self._hash_token(raw_token)
|
|
token_key = f"{TOKEN_PREFIX}{token_hash}"
|
|
|
|
token_data_str = await redis.get(token_key)
|
|
if not token_data_str:
|
|
return None
|
|
|
|
try:
|
|
token_data = json.loads(token_data_str)
|
|
identity_id = uuid.UUID(token_data["identity_id"])
|
|
email = token_data["email"]
|
|
except (json.JSONDecodeError, KeyError, ValueError):
|
|
return None
|
|
|
|
user_key = f"{USER_PREFIX}{identity_id}"
|
|
|
|
# Atomic delete to ensure single-use
|
|
async with redis.pipeline(transaction=True) as pipe:
|
|
pipe.delete(token_key)
|
|
pipe.delete(user_key)
|
|
await pipe.execute()
|
|
|
|
return {"identity_id": identity_id, "email": email}
|
|
|
|
async def send_verification_email(
|
|
self,
|
|
to: str,
|
|
display_name: str,
|
|
verification_code: str,
|
|
expiry_minutes: int,
|
|
) -> None:
|
|
"""Send an email verification code using the configured template."""
|
|
from app.services.system_email_service import send_system_email, render_email_template
|
|
|
|
variables = {
|
|
"display_name": display_name,
|
|
"verification_code": verification_code,
|
|
"expiry_minutes": str(expiry_minutes),
|
|
}
|
|
subject, body = await render_email_template("email_verification", variables)
|
|
await send_system_email(to, subject, body)
|
|
|
|
# Global Instance
|
|
email_verification_service = EmailVerificationService()
|