177 lines
7.1 KiB
Python
177 lines
7.1 KiB
Python
"""Add participants table, extend chat_sessions and chat_messages, migrate messages data, drop messages table."""
|
|
|
|
import uuid
|
|
import sqlalchemy as sa
|
|
from alembic import op
|
|
|
|
revision = "add_participants"
|
|
down_revision = "add_invitation_codes"
|
|
branch_labels = None
|
|
depends_on = None
|
|
|
|
|
|
def upgrade() -> None:
|
|
conn = op.get_bind()
|
|
|
|
# 1. Create participants table (idempotent)
|
|
conn.execute(sa.text("""
|
|
CREATE TABLE IF NOT EXISTS participants (
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
|
type VARCHAR(10) NOT NULL,
|
|
ref_id UUID NOT NULL,
|
|
display_name VARCHAR(100) NOT NULL,
|
|
avatar_url VARCHAR(500),
|
|
created_at TIMESTAMP WITH TIME ZONE DEFAULT now(),
|
|
CONSTRAINT uq_participants_type_ref UNIQUE (type, ref_id)
|
|
)
|
|
"""))
|
|
conn.execute(sa.text("""
|
|
CREATE INDEX IF NOT EXISTS ix_participants_ref_id ON participants (ref_id)
|
|
"""))
|
|
|
|
# 2. Backfill: create Participant for every existing User
|
|
conn.execute(sa.text("""
|
|
INSERT INTO participants (id, type, ref_id, display_name, avatar_url)
|
|
SELECT gen_random_uuid(), 'user', id, COALESCE(display_name, username), avatar_url
|
|
FROM users
|
|
ON CONFLICT DO NOTHING
|
|
"""))
|
|
|
|
# 3. Backfill: create Participant for every existing Agent
|
|
conn.execute(sa.text("""
|
|
INSERT INTO participants (id, type, ref_id, display_name, avatar_url)
|
|
SELECT gen_random_uuid(), 'agent', id, name, avatar_url
|
|
FROM agents
|
|
ON CONFLICT DO NOTHING
|
|
"""))
|
|
|
|
# 4. Add columns to chat_sessions
|
|
conn.execute(sa.text(
|
|
"ALTER TABLE chat_sessions ADD COLUMN IF NOT EXISTS participant_id UUID REFERENCES participants(id)"
|
|
))
|
|
conn.execute(sa.text(
|
|
"ALTER TABLE chat_sessions ADD COLUMN IF NOT EXISTS peer_agent_id UUID REFERENCES agents(id)"
|
|
))
|
|
conn.execute(sa.text(
|
|
"ALTER TABLE chat_sessions ADD COLUMN IF NOT EXISTS external_conv_id VARCHAR(200)"
|
|
))
|
|
|
|
# 5. Add participant_id to chat_messages
|
|
conn.execute(sa.text(
|
|
"ALTER TABLE chat_messages ADD COLUMN IF NOT EXISTS participant_id UUID REFERENCES participants(id)"
|
|
))
|
|
|
|
# 6. Backfill participant_id on chat_sessions from user_id
|
|
conn.execute(sa.text("""
|
|
UPDATE chat_sessions cs
|
|
SET participant_id = p.id
|
|
FROM participants p
|
|
WHERE p.type = 'user' AND p.ref_id = cs.user_id
|
|
AND cs.participant_id IS NULL
|
|
"""))
|
|
|
|
# 7. Backfill participant_id on chat_messages from user_id
|
|
conn.execute(sa.text("""
|
|
UPDATE chat_messages cm
|
|
SET participant_id = p.id
|
|
FROM participants p
|
|
WHERE p.type = 'user' AND p.ref_id = cm.user_id
|
|
AND cm.participant_id IS NULL
|
|
"""))
|
|
|
|
# 8. Migrate agent-to-agent messages from `messages` table into chat_sessions + chat_messages
|
|
# Only if messages table exists (it may not on fresh installs)
|
|
has_messages = conn.execute(sa.text(
|
|
"SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'messages')"
|
|
)).scalar()
|
|
|
|
if has_messages:
|
|
# Get distinct agent-to-agent conversation pairs
|
|
pairs = conn.execute(sa.text("""
|
|
SELECT DISTINCT
|
|
LEAST(sender_id, receiver_id) AS agent_a,
|
|
GREATEST(sender_id, receiver_id) AS agent_b
|
|
FROM messages
|
|
WHERE sender_type = 'agent' AND receiver_type = 'agent'
|
|
""")).fetchall()
|
|
|
|
for agent_a, agent_b in pairs:
|
|
session_id = str(uuid.uuid4())
|
|
|
|
# Get the sender participant (agent_a)
|
|
part_a = conn.execute(sa.text(
|
|
"SELECT id FROM participants WHERE type = 'agent' AND ref_id = :ref"
|
|
), {"ref": str(agent_a)}).scalar()
|
|
|
|
# Get time bounds
|
|
times = conn.execute(sa.text("""
|
|
SELECT MIN(created_at), MAX(created_at) FROM messages
|
|
WHERE sender_type = 'agent' AND receiver_type = 'agent'
|
|
AND ((sender_id = :a AND receiver_id = :b) OR (sender_id = :b AND receiver_id = :a))
|
|
"""), {"a": str(agent_a), "b": str(agent_b)}).fetchone()
|
|
|
|
# Create ChatSession (agent_a is the session owner, agent_b is peer)
|
|
# user_id = creator of agent_a (for backward compatibility)
|
|
creator_id = conn.execute(sa.text(
|
|
"SELECT creator_id FROM agents WHERE id = :id"
|
|
), {"id": str(agent_a)}).scalar()
|
|
|
|
conn.execute(sa.text("""
|
|
INSERT INTO chat_sessions (id, agent_id, user_id, title, source_channel, participant_id, peer_agent_id, created_at, last_message_at)
|
|
VALUES (:id, :agent_id, :user_id, :title, 'agent', :participant_id, :peer_agent_id, :created_at, :last_msg_at)
|
|
"""), {
|
|
"id": session_id,
|
|
"agent_id": agent_a,
|
|
"user_id": str(creator_id) if creator_id else str(agent_a),
|
|
"title": "Agent Conversation",
|
|
"participant_id": str(part_a) if part_a else None,
|
|
"peer_agent_id": str(agent_b),
|
|
"created_at": times[0],
|
|
"last_msg_at": times[1],
|
|
})
|
|
|
|
# Copy messages into chat_messages
|
|
msgs = conn.execute(sa.text("""
|
|
SELECT sender_id, content, created_at FROM messages
|
|
WHERE sender_type = 'agent' AND receiver_type = 'agent'
|
|
AND ((sender_id = :a AND receiver_id = :b) OR (sender_id = :b AND receiver_id = :a))
|
|
ORDER BY created_at
|
|
"""), {"a": str(agent_a), "b": str(agent_b)}).fetchall()
|
|
|
|
for sender_id, content, created_at in msgs:
|
|
sender_part = conn.execute(sa.text(
|
|
"SELECT id FROM participants WHERE type = 'agent' AND ref_id = :ref"
|
|
), {"ref": str(sender_id)}).scalar()
|
|
|
|
conn.execute(sa.text("""
|
|
INSERT INTO chat_messages (id, agent_id, user_id, role, content, conversation_id, participant_id, created_at)
|
|
VALUES (:id, :agent_id, :user_id, :role, :content, :conv_id, :part_id, :created_at)
|
|
"""), {
|
|
"id": str(uuid.uuid4()),
|
|
"agent_id": str(agent_a),
|
|
"user_id": str(creator_id) if creator_id else str(agent_a),
|
|
"role": "assistant" if str(sender_id) == str(agent_a) else "user",
|
|
"content": content,
|
|
"conv_id": session_id,
|
|
"part_id": str(sender_part) if sender_part else None,
|
|
"created_at": created_at,
|
|
})
|
|
|
|
# 9. Drop messages table
|
|
op.drop_table("messages")
|
|
|
|
# Drop the enum types used by messages table
|
|
try:
|
|
conn.execute(sa.text("DROP TYPE IF EXISTS msg_participant_type_enum"))
|
|
conn.execute(sa.text("DROP TYPE IF EXISTS msg_type_enum"))
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def downgrade() -> None:
|
|
# Remove new columns
|
|
op.drop_column("chat_messages", "participant_id")
|
|
op.drop_column("chat_sessions", "peer_agent_id")
|
|
op.drop_column("chat_sessions", "participant_id")
|
|
op.drop_table("participants")
|