mirror of
https://github.com/renee-png/acmcc.git
synced 2026-06-21 09:50:01 +00:00
183fe0a93c
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
293 lines
11 KiB
PL/PgSQL
293 lines
11 KiB
PL/PgSQL
-- Email infrastructure
|
|
-- Creates the queue system, send log, send state, suppression, and unsubscribe
|
|
-- tables used by both auth and transactional emails.
|
|
|
|
-- Extensions required for queue processing
|
|
CREATE EXTENSION IF NOT EXISTS pg_net SCHEMA extensions;
|
|
DO $$ BEGIN
|
|
IF NOT EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'pg_cron') THEN
|
|
CREATE EXTENSION pg_cron;
|
|
END IF;
|
|
END $$;
|
|
CREATE EXTENSION IF NOT EXISTS supabase_vault;
|
|
CREATE EXTENSION IF NOT EXISTS pgmq;
|
|
|
|
-- Create email queues (auth = high priority, transactional = normal)
|
|
-- Wrapped in DO blocks to handle "queue already exists" errors idempotently.
|
|
DO $$ BEGIN PERFORM pgmq.create('auth_emails'); EXCEPTION WHEN OTHERS THEN NULL; END $$;
|
|
DO $$ BEGIN PERFORM pgmq.create('transactional_emails'); EXCEPTION WHEN OTHERS THEN NULL; END $$;
|
|
|
|
-- Dead-letter queues for messages that exceed max retries
|
|
DO $$ BEGIN PERFORM pgmq.create('auth_emails_dlq'); EXCEPTION WHEN OTHERS THEN NULL; END $$;
|
|
DO $$ BEGIN PERFORM pgmq.create('transactional_emails_dlq'); EXCEPTION WHEN OTHERS THEN NULL; END $$;
|
|
|
|
-- Email send log table (audit trail for all send attempts)
|
|
-- UPDATE is allowed for the service role so the suppression edge function
|
|
-- can update a log record's status when a bounce/complaint/unsubscribe occurs.
|
|
CREATE TABLE IF NOT EXISTS public.email_send_log (
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
|
message_id TEXT,
|
|
template_name TEXT NOT NULL,
|
|
recipient_email TEXT NOT NULL,
|
|
status TEXT NOT NULL CHECK (status IN ('pending', 'sent', 'suppressed', 'failed', 'bounced', 'complained', 'dlq')),
|
|
error_message TEXT,
|
|
metadata JSONB,
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
|
);
|
|
|
|
ALTER TABLE public.email_send_log ENABLE ROW LEVEL SECURITY;
|
|
|
|
DO $$ BEGIN
|
|
CREATE POLICY "Service role can read send log"
|
|
ON public.email_send_log FOR SELECT
|
|
USING (auth.role() = 'service_role');
|
|
EXCEPTION WHEN duplicate_object THEN NULL;
|
|
END $$;
|
|
|
|
DO $$ BEGIN
|
|
CREATE POLICY "Service role can insert send log"
|
|
ON public.email_send_log FOR INSERT
|
|
WITH CHECK (auth.role() = 'service_role');
|
|
EXCEPTION WHEN duplicate_object THEN NULL;
|
|
END $$;
|
|
|
|
DO $$ BEGIN
|
|
CREATE POLICY "Service role can update send log"
|
|
ON public.email_send_log FOR UPDATE
|
|
USING (auth.role() = 'service_role')
|
|
WITH CHECK (auth.role() = 'service_role');
|
|
EXCEPTION WHEN duplicate_object THEN NULL;
|
|
END $$;
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_email_send_log_created ON public.email_send_log(created_at DESC);
|
|
CREATE INDEX IF NOT EXISTS idx_email_send_log_recipient ON public.email_send_log(recipient_email);
|
|
|
|
-- Backfill: add message_id column to existing tables that predate this migration
|
|
DO $$ BEGIN
|
|
ALTER TABLE public.email_send_log ADD COLUMN message_id TEXT;
|
|
EXCEPTION WHEN duplicate_column THEN NULL;
|
|
END $$;
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_email_send_log_message ON public.email_send_log(message_id);
|
|
|
|
-- Prevent duplicate sends: only one 'sent' row per message_id.
|
|
-- If VT expires and another worker picks up the same message, the pre-send
|
|
-- check catches it. This index is a DB-level safety net for race conditions.
|
|
CREATE UNIQUE INDEX IF NOT EXISTS idx_email_send_log_message_sent_unique
|
|
ON public.email_send_log(message_id) WHERE status = 'sent';
|
|
|
|
-- Backfill: update status CHECK constraint for existing tables that predate new statuses
|
|
DO $$ BEGIN
|
|
ALTER TABLE public.email_send_log DROP CONSTRAINT IF EXISTS email_send_log_status_check;
|
|
ALTER TABLE public.email_send_log ADD CONSTRAINT email_send_log_status_check
|
|
CHECK (status IN ('pending', 'sent', 'suppressed', 'failed', 'bounced', 'complained', 'dlq'));
|
|
END $$;
|
|
|
|
-- Rate-limit state and queue config (single row, tracks Retry-After cooldown + throughput settings)
|
|
CREATE TABLE IF NOT EXISTS public.email_send_state (
|
|
id INT PRIMARY KEY DEFAULT 1 CHECK (id = 1),
|
|
retry_after_until TIMESTAMPTZ,
|
|
batch_size INTEGER NOT NULL DEFAULT 10,
|
|
send_delay_ms INTEGER NOT NULL DEFAULT 200,
|
|
auth_email_ttl_minutes INTEGER NOT NULL DEFAULT 15,
|
|
transactional_email_ttl_minutes INTEGER NOT NULL DEFAULT 60,
|
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
|
);
|
|
|
|
INSERT INTO public.email_send_state (id) VALUES (1) ON CONFLICT DO NOTHING;
|
|
|
|
-- Backfill: add config columns to existing tables that predate this migration
|
|
DO $$ BEGIN
|
|
ALTER TABLE public.email_send_state ADD COLUMN batch_size INTEGER NOT NULL DEFAULT 10;
|
|
EXCEPTION WHEN duplicate_column THEN NULL;
|
|
END $$;
|
|
DO $$ BEGIN
|
|
ALTER TABLE public.email_send_state ADD COLUMN send_delay_ms INTEGER NOT NULL DEFAULT 200;
|
|
EXCEPTION WHEN duplicate_column THEN NULL;
|
|
END $$;
|
|
DO $$ BEGIN
|
|
ALTER TABLE public.email_send_state ADD COLUMN auth_email_ttl_minutes INTEGER NOT NULL DEFAULT 15;
|
|
EXCEPTION WHEN duplicate_column THEN NULL;
|
|
END $$;
|
|
DO $$ BEGIN
|
|
ALTER TABLE public.email_send_state ADD COLUMN transactional_email_ttl_minutes INTEGER NOT NULL DEFAULT 60;
|
|
EXCEPTION WHEN duplicate_column THEN NULL;
|
|
END $$;
|
|
|
|
ALTER TABLE public.email_send_state ENABLE ROW LEVEL SECURITY;
|
|
|
|
DO $$ BEGIN
|
|
CREATE POLICY "Service role can manage send state"
|
|
ON public.email_send_state FOR ALL
|
|
USING (auth.role() = 'service_role')
|
|
WITH CHECK (auth.role() = 'service_role');
|
|
EXCEPTION WHEN duplicate_object THEN NULL;
|
|
END $$;
|
|
|
|
-- RPC wrappers so Edge Functions can interact with pgmq via supabase.rpc()
|
|
-- (PostgREST only exposes functions in the public schema; pgmq functions are in the pgmq schema)
|
|
-- All wrappers auto-create the queue on undefined_table (42P01) so emails
|
|
-- are never lost if the queue was dropped (extension upgrade, restore, etc.).
|
|
CREATE OR REPLACE FUNCTION public.enqueue_email(queue_name TEXT, payload JSONB)
|
|
RETURNS BIGINT
|
|
LANGUAGE plpgsql SECURITY DEFINER
|
|
AS $$
|
|
BEGIN
|
|
RETURN pgmq.send(queue_name, payload);
|
|
EXCEPTION WHEN undefined_table THEN
|
|
PERFORM pgmq.create(queue_name);
|
|
RETURN pgmq.send(queue_name, payload);
|
|
END;
|
|
$$;
|
|
|
|
CREATE OR REPLACE FUNCTION public.read_email_batch(queue_name TEXT, batch_size INT, vt INT)
|
|
RETURNS TABLE(msg_id BIGINT, read_ct INT, message JSONB)
|
|
LANGUAGE plpgsql SECURITY DEFINER
|
|
AS $$
|
|
BEGIN
|
|
RETURN QUERY SELECT r.msg_id, r.read_ct, r.message FROM pgmq.read(queue_name, vt, batch_size) r;
|
|
EXCEPTION WHEN undefined_table THEN
|
|
PERFORM pgmq.create(queue_name);
|
|
RETURN;
|
|
END;
|
|
$$;
|
|
|
|
CREATE OR REPLACE FUNCTION public.delete_email(queue_name TEXT, message_id BIGINT)
|
|
RETURNS BOOLEAN
|
|
LANGUAGE plpgsql SECURITY DEFINER
|
|
AS $$
|
|
BEGIN
|
|
RETURN pgmq.delete(queue_name, message_id);
|
|
EXCEPTION WHEN undefined_table THEN
|
|
RETURN FALSE;
|
|
END;
|
|
$$;
|
|
|
|
CREATE OR REPLACE FUNCTION public.move_to_dlq(
|
|
source_queue TEXT, dlq_name TEXT, message_id BIGINT, payload JSONB
|
|
)
|
|
RETURNS BIGINT
|
|
LANGUAGE plpgsql SECURITY DEFINER
|
|
AS $$
|
|
DECLARE new_id BIGINT;
|
|
BEGIN
|
|
SELECT pgmq.send(dlq_name, payload) INTO new_id;
|
|
PERFORM pgmq.delete(source_queue, message_id);
|
|
RETURN new_id;
|
|
EXCEPTION WHEN undefined_table THEN
|
|
BEGIN
|
|
PERFORM pgmq.create(dlq_name);
|
|
EXCEPTION WHEN OTHERS THEN
|
|
NULL;
|
|
END;
|
|
SELECT pgmq.send(dlq_name, payload) INTO new_id;
|
|
BEGIN
|
|
PERFORM pgmq.delete(source_queue, message_id);
|
|
EXCEPTION WHEN undefined_table THEN
|
|
NULL;
|
|
END;
|
|
RETURN new_id;
|
|
END;
|
|
$$;
|
|
|
|
-- Restrict queue RPC wrappers to service_role only (SECURITY DEFINER runs as owner,
|
|
-- so without this any authenticated user could manipulate the email queues)
|
|
REVOKE EXECUTE ON FUNCTION public.enqueue_email(TEXT, JSONB) FROM PUBLIC;
|
|
GRANT EXECUTE ON FUNCTION public.enqueue_email(TEXT, JSONB) TO service_role;
|
|
|
|
REVOKE EXECUTE ON FUNCTION public.read_email_batch(TEXT, INT, INT) FROM PUBLIC;
|
|
GRANT EXECUTE ON FUNCTION public.read_email_batch(TEXT, INT, INT) TO service_role;
|
|
|
|
REVOKE EXECUTE ON FUNCTION public.delete_email(TEXT, BIGINT) FROM PUBLIC;
|
|
GRANT EXECUTE ON FUNCTION public.delete_email(TEXT, BIGINT) TO service_role;
|
|
|
|
REVOKE EXECUTE ON FUNCTION public.move_to_dlq(TEXT, TEXT, BIGINT, JSONB) FROM PUBLIC;
|
|
GRANT EXECUTE ON FUNCTION public.move_to_dlq(TEXT, TEXT, BIGINT, JSONB) TO service_role;
|
|
|
|
-- Suppressed emails table (tracks unsubscribes, bounces, complaints)
|
|
-- Append-only: no DELETE or UPDATE policies to prevent bypassing suppression.
|
|
CREATE TABLE IF NOT EXISTS public.suppressed_emails (
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
|
email TEXT NOT NULL,
|
|
reason TEXT NOT NULL CHECK (reason IN ('unsubscribe', 'bounce', 'complaint')),
|
|
metadata JSONB,
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
|
UNIQUE(email)
|
|
);
|
|
|
|
ALTER TABLE public.suppressed_emails ENABLE ROW LEVEL SECURITY;
|
|
|
|
DO $$ BEGIN
|
|
CREATE POLICY "Service role can read suppressed emails"
|
|
ON public.suppressed_emails FOR SELECT
|
|
USING (auth.role() = 'service_role');
|
|
EXCEPTION WHEN duplicate_object THEN NULL;
|
|
END $$;
|
|
|
|
DO $$ BEGIN
|
|
CREATE POLICY "Service role can insert suppressed emails"
|
|
ON public.suppressed_emails FOR INSERT
|
|
WITH CHECK (auth.role() = 'service_role');
|
|
EXCEPTION WHEN duplicate_object THEN NULL;
|
|
END $$;
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_suppressed_emails_email ON public.suppressed_emails(email);
|
|
|
|
-- Email unsubscribe tokens table (one token per email address for unsubscribe links)
|
|
-- No DELETE policy to prevent removing tokens. UPDATE allowed only to mark tokens as used.
|
|
CREATE TABLE IF NOT EXISTS public.email_unsubscribe_tokens (
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
|
token TEXT NOT NULL UNIQUE,
|
|
email TEXT NOT NULL UNIQUE,
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
|
used_at TIMESTAMPTZ
|
|
);
|
|
|
|
ALTER TABLE public.email_unsubscribe_tokens ENABLE ROW LEVEL SECURITY;
|
|
|
|
DO $$ BEGIN
|
|
CREATE POLICY "Service role can read tokens"
|
|
ON public.email_unsubscribe_tokens FOR SELECT
|
|
USING (auth.role() = 'service_role');
|
|
EXCEPTION WHEN duplicate_object THEN NULL;
|
|
END $$;
|
|
|
|
DO $$ BEGIN
|
|
CREATE POLICY "Service role can insert tokens"
|
|
ON public.email_unsubscribe_tokens FOR INSERT
|
|
WITH CHECK (auth.role() = 'service_role');
|
|
EXCEPTION WHEN duplicate_object THEN NULL;
|
|
END $$;
|
|
|
|
DO $$ BEGIN
|
|
CREATE POLICY "Service role can mark tokens as used"
|
|
ON public.email_unsubscribe_tokens FOR UPDATE
|
|
USING (auth.role() = 'service_role')
|
|
WITH CHECK (auth.role() = 'service_role');
|
|
EXCEPTION WHEN duplicate_object THEN NULL;
|
|
END $$;
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_unsubscribe_tokens_token ON public.email_unsubscribe_tokens(token);
|
|
|
|
-- ============================================================
|
|
-- POST-MIGRATION STEPS (applied dynamically by setup_email_infra)
|
|
-- These steps contain project-specific secrets and URLs and
|
|
-- cannot be expressed as static SQL. They are applied via the
|
|
-- Supabase Management API (ExecuteSQL) each time the tool runs.
|
|
-- ============================================================
|
|
--
|
|
-- 1. VAULT SECRET
|
|
-- Stores (or updates) the Supabase service_role key in
|
|
-- vault as 'email_queue_service_role_key'.
|
|
-- Uses vault.create_secret / vault.update_secret (upsert).
|
|
-- To revert: DELETE FROM vault.secrets WHERE name = 'email_queue_service_role_key';
|
|
--
|
|
-- 2. CRON JOB (pg_cron)
|
|
-- Creates job 'process-email-queue' with a 5-second interval.
|
|
-- The job checks:
|
|
-- a) rate-limit cooldown (email_send_state.retry_after_until)
|
|
-- b) whether auth_emails or transactional_emails queues have messages
|
|
-- If conditions are met, it calls the process-email-queue Edge Function
|
|
-- via net.http_post using the vault-stored service_role key.
|
|
-- To revert: SELECT cron.unschedule('process-email-queue');
|