-- Email infrastructure -- Creates the queue system, send log, send state, suppression, and unsubscribe -- tables used by both auth and transactional emails. -- Extensions required for queue processing CREATE EXTENSION IF NOT EXISTS pg_net SCHEMA extensions; DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'pg_cron') THEN CREATE EXTENSION pg_cron; END IF; END $$; CREATE EXTENSION IF NOT EXISTS supabase_vault; CREATE EXTENSION IF NOT EXISTS pgmq; -- Create email queues (auth = high priority, transactional = normal) -- Wrapped in DO blocks to handle "queue already exists" errors idempotently. DO $$ BEGIN PERFORM pgmq.create('auth_emails'); EXCEPTION WHEN OTHERS THEN NULL; END $$; DO $$ BEGIN PERFORM pgmq.create('transactional_emails'); EXCEPTION WHEN OTHERS THEN NULL; END $$; -- Dead-letter queues for messages that exceed max retries DO $$ BEGIN PERFORM pgmq.create('auth_emails_dlq'); EXCEPTION WHEN OTHERS THEN NULL; END $$; DO $$ BEGIN PERFORM pgmq.create('transactional_emails_dlq'); EXCEPTION WHEN OTHERS THEN NULL; END $$; -- Email send log table (audit trail for all send attempts) -- UPDATE is allowed for the service role so the suppression edge function -- can update a log record's status when a bounce/complaint/unsubscribe occurs. CREATE TABLE IF NOT EXISTS public.email_send_log ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), message_id TEXT, template_name TEXT NOT NULL, recipient_email TEXT NOT NULL, status TEXT NOT NULL CHECK (status IN ('pending', 'sent', 'suppressed', 'failed', 'bounced', 'complained', 'dlq')), error_message TEXT, metadata JSONB, created_at TIMESTAMPTZ NOT NULL DEFAULT now() ); ALTER TABLE public.email_send_log ENABLE ROW LEVEL SECURITY; DO $$ BEGIN CREATE POLICY "Service role can read send log" ON public.email_send_log FOR SELECT USING (auth.role() = 'service_role'); EXCEPTION WHEN duplicate_object THEN NULL; END $$; DO $$ BEGIN CREATE POLICY "Service role can insert send log" ON public.email_send_log FOR INSERT WITH CHECK (auth.role() = 'service_role'); EXCEPTION WHEN duplicate_object THEN NULL; END $$; DO $$ BEGIN CREATE POLICY "Service role can update send log" ON public.email_send_log FOR UPDATE USING (auth.role() = 'service_role') WITH CHECK (auth.role() = 'service_role'); EXCEPTION WHEN duplicate_object THEN NULL; END $$; CREATE INDEX IF NOT EXISTS idx_email_send_log_created ON public.email_send_log(created_at DESC); CREATE INDEX IF NOT EXISTS idx_email_send_log_recipient ON public.email_send_log(recipient_email); -- Backfill: add message_id column to existing tables that predate this migration DO $$ BEGIN ALTER TABLE public.email_send_log ADD COLUMN message_id TEXT; EXCEPTION WHEN duplicate_column THEN NULL; END $$; CREATE INDEX IF NOT EXISTS idx_email_send_log_message ON public.email_send_log(message_id); -- Prevent duplicate sends: only one 'sent' row per message_id. -- If VT expires and another worker picks up the same message, the pre-send -- check catches it. This index is a DB-level safety net for race conditions. CREATE UNIQUE INDEX IF NOT EXISTS idx_email_send_log_message_sent_unique ON public.email_send_log(message_id) WHERE status = 'sent'; -- Backfill: update status CHECK constraint for existing tables that predate new statuses DO $$ BEGIN ALTER TABLE public.email_send_log DROP CONSTRAINT IF EXISTS email_send_log_status_check; ALTER TABLE public.email_send_log ADD CONSTRAINT email_send_log_status_check CHECK (status IN ('pending', 'sent', 'suppressed', 'failed', 'bounced', 'complained', 'dlq')); END $$; -- Rate-limit state and queue config (single row, tracks Retry-After cooldown + throughput settings) CREATE TABLE IF NOT EXISTS public.email_send_state ( id INT PRIMARY KEY DEFAULT 1 CHECK (id = 1), retry_after_until TIMESTAMPTZ, batch_size INTEGER NOT NULL DEFAULT 10, send_delay_ms INTEGER NOT NULL DEFAULT 200, auth_email_ttl_minutes INTEGER NOT NULL DEFAULT 15, transactional_email_ttl_minutes INTEGER NOT NULL DEFAULT 60, updated_at TIMESTAMPTZ NOT NULL DEFAULT now() ); INSERT INTO public.email_send_state (id) VALUES (1) ON CONFLICT DO NOTHING; -- Backfill: add config columns to existing tables that predate this migration DO $$ BEGIN ALTER TABLE public.email_send_state ADD COLUMN batch_size INTEGER NOT NULL DEFAULT 10; EXCEPTION WHEN duplicate_column THEN NULL; END $$; DO $$ BEGIN ALTER TABLE public.email_send_state ADD COLUMN send_delay_ms INTEGER NOT NULL DEFAULT 200; EXCEPTION WHEN duplicate_column THEN NULL; END $$; DO $$ BEGIN ALTER TABLE public.email_send_state ADD COLUMN auth_email_ttl_minutes INTEGER NOT NULL DEFAULT 15; EXCEPTION WHEN duplicate_column THEN NULL; END $$; DO $$ BEGIN ALTER TABLE public.email_send_state ADD COLUMN transactional_email_ttl_minutes INTEGER NOT NULL DEFAULT 60; EXCEPTION WHEN duplicate_column THEN NULL; END $$; ALTER TABLE public.email_send_state ENABLE ROW LEVEL SECURITY; DO $$ BEGIN CREATE POLICY "Service role can manage send state" ON public.email_send_state FOR ALL USING (auth.role() = 'service_role') WITH CHECK (auth.role() = 'service_role'); EXCEPTION WHEN duplicate_object THEN NULL; END $$; -- RPC wrappers so Edge Functions can interact with pgmq via supabase.rpc() -- (PostgREST only exposes functions in the public schema; pgmq functions are in the pgmq schema) -- All wrappers auto-create the queue on undefined_table (42P01) so emails -- are never lost if the queue was dropped (extension upgrade, restore, etc.). CREATE OR REPLACE FUNCTION public.enqueue_email(queue_name TEXT, payload JSONB) RETURNS BIGINT LANGUAGE plpgsql SECURITY DEFINER AS $$ BEGIN RETURN pgmq.send(queue_name, payload); EXCEPTION WHEN undefined_table THEN PERFORM pgmq.create(queue_name); RETURN pgmq.send(queue_name, payload); END; $$; CREATE OR REPLACE FUNCTION public.read_email_batch(queue_name TEXT, batch_size INT, vt INT) RETURNS TABLE(msg_id BIGINT, read_ct INT, message JSONB) LANGUAGE plpgsql SECURITY DEFINER AS $$ BEGIN RETURN QUERY SELECT r.msg_id, r.read_ct, r.message FROM pgmq.read(queue_name, vt, batch_size) r; EXCEPTION WHEN undefined_table THEN PERFORM pgmq.create(queue_name); RETURN; END; $$; CREATE OR REPLACE FUNCTION public.delete_email(queue_name TEXT, message_id BIGINT) RETURNS BOOLEAN LANGUAGE plpgsql SECURITY DEFINER AS $$ BEGIN RETURN pgmq.delete(queue_name, message_id); EXCEPTION WHEN undefined_table THEN RETURN FALSE; END; $$; CREATE OR REPLACE FUNCTION public.move_to_dlq( source_queue TEXT, dlq_name TEXT, message_id BIGINT, payload JSONB ) RETURNS BIGINT LANGUAGE plpgsql SECURITY DEFINER AS $$ DECLARE new_id BIGINT; BEGIN SELECT pgmq.send(dlq_name, payload) INTO new_id; PERFORM pgmq.delete(source_queue, message_id); RETURN new_id; EXCEPTION WHEN undefined_table THEN BEGIN PERFORM pgmq.create(dlq_name); EXCEPTION WHEN OTHERS THEN NULL; END; SELECT pgmq.send(dlq_name, payload) INTO new_id; BEGIN PERFORM pgmq.delete(source_queue, message_id); EXCEPTION WHEN undefined_table THEN NULL; END; RETURN new_id; END; $$; -- Restrict queue RPC wrappers to service_role only (SECURITY DEFINER runs as owner, -- so without this any authenticated user could manipulate the email queues) REVOKE EXECUTE ON FUNCTION public.enqueue_email(TEXT, JSONB) FROM PUBLIC; GRANT EXECUTE ON FUNCTION public.enqueue_email(TEXT, JSONB) TO service_role; REVOKE EXECUTE ON FUNCTION public.read_email_batch(TEXT, INT, INT) FROM PUBLIC; GRANT EXECUTE ON FUNCTION public.read_email_batch(TEXT, INT, INT) TO service_role; REVOKE EXECUTE ON FUNCTION public.delete_email(TEXT, BIGINT) FROM PUBLIC; GRANT EXECUTE ON FUNCTION public.delete_email(TEXT, BIGINT) TO service_role; REVOKE EXECUTE ON FUNCTION public.move_to_dlq(TEXT, TEXT, BIGINT, JSONB) FROM PUBLIC; GRANT EXECUTE ON FUNCTION public.move_to_dlq(TEXT, TEXT, BIGINT, JSONB) TO service_role; -- Suppressed emails table (tracks unsubscribes, bounces, complaints) -- Append-only: no DELETE or UPDATE policies to prevent bypassing suppression. CREATE TABLE IF NOT EXISTS public.suppressed_emails ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), email TEXT NOT NULL, reason TEXT NOT NULL CHECK (reason IN ('unsubscribe', 'bounce', 'complaint')), metadata JSONB, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), UNIQUE(email) ); ALTER TABLE public.suppressed_emails ENABLE ROW LEVEL SECURITY; DO $$ BEGIN CREATE POLICY "Service role can read suppressed emails" ON public.suppressed_emails FOR SELECT USING (auth.role() = 'service_role'); EXCEPTION WHEN duplicate_object THEN NULL; END $$; DO $$ BEGIN CREATE POLICY "Service role can insert suppressed emails" ON public.suppressed_emails FOR INSERT WITH CHECK (auth.role() = 'service_role'); EXCEPTION WHEN duplicate_object THEN NULL; END $$; CREATE INDEX IF NOT EXISTS idx_suppressed_emails_email ON public.suppressed_emails(email); -- Email unsubscribe tokens table (one token per email address for unsubscribe links) -- No DELETE policy to prevent removing tokens. UPDATE allowed only to mark tokens as used. CREATE TABLE IF NOT EXISTS public.email_unsubscribe_tokens ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), token TEXT NOT NULL UNIQUE, email TEXT NOT NULL UNIQUE, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), used_at TIMESTAMPTZ ); ALTER TABLE public.email_unsubscribe_tokens ENABLE ROW LEVEL SECURITY; DO $$ BEGIN CREATE POLICY "Service role can read tokens" ON public.email_unsubscribe_tokens FOR SELECT USING (auth.role() = 'service_role'); EXCEPTION WHEN duplicate_object THEN NULL; END $$; DO $$ BEGIN CREATE POLICY "Service role can insert tokens" ON public.email_unsubscribe_tokens FOR INSERT WITH CHECK (auth.role() = 'service_role'); EXCEPTION WHEN duplicate_object THEN NULL; END $$; DO $$ BEGIN CREATE POLICY "Service role can mark tokens as used" ON public.email_unsubscribe_tokens FOR UPDATE USING (auth.role() = 'service_role') WITH CHECK (auth.role() = 'service_role'); EXCEPTION WHEN duplicate_object THEN NULL; END $$; CREATE INDEX IF NOT EXISTS idx_unsubscribe_tokens_token ON public.email_unsubscribe_tokens(token); -- ============================================================ -- POST-MIGRATION STEPS (applied dynamically by setup_email_infra) -- These steps contain project-specific secrets and URLs and -- cannot be expressed as static SQL. They are applied via the -- Supabase Management API (ExecuteSQL) each time the tool runs. -- ============================================================ -- -- 1. VAULT SECRET -- Stores (or updates) the Supabase service_role key in -- vault as 'email_queue_service_role_key'. -- Uses vault.create_secret / vault.update_secret (upsert). -- To revert: DELETE FROM vault.secrets WHERE name = 'email_queue_service_role_key'; -- -- 2. CRON JOB (pg_cron) -- Creates job 'process-email-queue' with a 5-second interval. -- The job checks: -- a) rate-limit cooldown (email_send_state.retry_after_until) -- b) whether auth_emails or transactional_emails queues have messages -- If conditions are met, it calls the process-email-queue Edge Function -- via net.http_post using the vault-stored service_role key. -- To revert: SELECT cron.unschedule('process-email-queue');