Clawith/backend/app/services/password_reset_service.py

88 lines
2.9 KiB
Python

"""Password reset token lifecycle helpers."""
from __future__ import annotations
import hashlib
import secrets
import uuid
from datetime import datetime, timedelta, timezone
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import get_settings
from app.core.events import get_redis
from app.models.system_settings import SystemSetting
# Key prefixes for Redis
TOKEN_PREFIX = "pwd_reset:token:"
USER_PREFIX = "pwd_reset:user:"
def _hash_token(token: str) -> str:
"""Hash a raw reset token before persistence or lookup."""
return hashlib.sha256(token.encode("utf-8")).hexdigest()
async def create_password_reset_token(identity_id: uuid.UUID) -> tuple[str, datetime]:
"""Create a new single-use token and invalidate older unused tokens in Redis."""
redis = await get_redis()
user_key = f"{USER_PREFIX}{identity_id}"
# Invalidate previous token for this user if exists
old_token_hash = await redis.get(user_key)
if old_token_hash:
await redis.delete(f"{TOKEN_PREFIX}{old_token_hash}")
raw_token = secrets.token_urlsafe(32)
token_hash = _hash_token(raw_token)
now = datetime.now(timezone.utc)
expiry_minutes = get_settings().PASSWORD_RESET_TOKEN_EXPIRE_MINUTES
expires_at = now + timedelta(minutes=expiry_minutes)
# Store the new token (bi-directional mapping for easy invalidation)
token_key = f"{TOKEN_PREFIX}{token_hash}"
ttl_seconds = int(expiry_minutes * 60)
async with redis.pipeline(transaction=True) as pipe:
pipe.setex(token_key, ttl_seconds, str(identity_id))
pipe.setex(user_key, ttl_seconds, token_hash)
await pipe.execute()
return raw_token, expires_at
async def get_public_base_url(db: AsyncSession) -> str:
"""Resolve the public base URL used for user-facing links."""
from app.services.platform_service import platform_service
return await platform_service.get_public_base_url(db)
async def build_password_reset_url(db: AsyncSession, raw_token: str) -> str:
"""Build the user-facing reset URL."""
base_url = await get_public_base_url(db)
return f"{base_url}/reset-password?token={raw_token}"
async def consume_password_reset_token(raw_token: str) -> dict | None:
"""Load a valid reset token from Redis and mark it used (by deleting)."""
redis = await get_redis()
token_hash = _hash_token(raw_token)
token_key = f"{TOKEN_PREFIX}{token_hash}"
identity_id_str = await redis.get(token_key)
if not identity_id_str:
return None
identity_id = uuid.UUID(identity_id_str)
user_key = f"{USER_PREFIX}{identity_id}"
# Atomic delete to ensure single-use
async with redis.pipeline(transaction=True) as pipe:
pipe.delete(token_key)
pipe.delete(user_key)
await pipe.execute()
return {"identity_id": identity_id}