mirror of
https://github.com/renee-png/acmcc.git
synced 2026-06-21 09:50:01 +00:00
183fe0a93c
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
364 lines
13 KiB
TypeScript
364 lines
13 KiB
TypeScript
import { sendLovableEmail } from 'npm:@lovable.dev/email-js'
|
|
import { createClient } from 'npm:@supabase/supabase-js@2'
|
|
|
|
const MAX_RETRIES = 5
|
|
const DEFAULT_BATCH_SIZE = 10
|
|
const DEFAULT_SEND_DELAY_MS = 200
|
|
const DEFAULT_AUTH_TTL_MINUTES = 15
|
|
const DEFAULT_TRANSACTIONAL_TTL_MINUTES = 60
|
|
|
|
// Check if an error is a rate-limit (429) response.
|
|
// Uses EmailAPIError.status when available (email-js >=0.x with structured errors),
|
|
// falls back to parsing the error message for older versions.
|
|
function isRateLimited(error: unknown): boolean {
|
|
if (error && typeof error === 'object' && 'status' in error) {
|
|
return (error as { status: number }).status === 429
|
|
}
|
|
return error instanceof Error && error.message.includes('429')
|
|
}
|
|
|
|
// Check if an error is a forbidden (403) response. Retrying won't help.
|
|
// Move straight to DLQ.
|
|
function isForbidden(error: unknown): boolean {
|
|
if (error && typeof error === 'object' && 'status' in error) {
|
|
return (error as { status: number }).status === 403
|
|
}
|
|
return error instanceof Error && error.message.includes('403')
|
|
}
|
|
|
|
// Extract Retry-After seconds from a structured EmailAPIError, or default to 60s.
|
|
function getRetryAfterSeconds(error: unknown): number {
|
|
if (error && typeof error === 'object' && 'retryAfterSeconds' in error) {
|
|
return (error as { retryAfterSeconds: number | null }).retryAfterSeconds ?? 60
|
|
}
|
|
return 60
|
|
}
|
|
|
|
function parseJwtClaims(token: string): Record<string, unknown> | null {
|
|
const parts = token.split('.')
|
|
if (parts.length < 2) {
|
|
return null
|
|
}
|
|
|
|
try {
|
|
const payload = parts[1]
|
|
.replaceAll('-', '+')
|
|
.replaceAll('_', '/')
|
|
.padEnd(Math.ceil(parts[1].length / 4) * 4, '=')
|
|
|
|
return JSON.parse(atob(payload)) as Record<string, unknown>
|
|
} catch {
|
|
return null
|
|
}
|
|
}
|
|
|
|
// Move a message to the dead letter queue and log the reason.
|
|
async function moveToDlq(
|
|
supabase: ReturnType<typeof createClient>,
|
|
queue: string,
|
|
msg: { msg_id: number; message: Record<string, unknown> },
|
|
reason: string
|
|
): Promise<void> {
|
|
const payload = msg.message
|
|
await supabase.from('email_send_log').insert({
|
|
message_id: payload.message_id,
|
|
template_name: (payload.label || queue) as string,
|
|
recipient_email: payload.to,
|
|
status: 'dlq',
|
|
error_message: reason,
|
|
})
|
|
const { error } = await supabase.rpc('move_to_dlq', {
|
|
source_queue: queue,
|
|
dlq_name: `${queue}_dlq`,
|
|
message_id: msg.msg_id,
|
|
payload,
|
|
})
|
|
if (error) {
|
|
console.error('Failed to move message to DLQ', { queue, msg_id: msg.msg_id, reason, error })
|
|
}
|
|
}
|
|
|
|
Deno.serve(async (req) => {
|
|
const apiKey = Deno.env.get('LOVABLE_API_KEY')
|
|
const supabaseUrl = Deno.env.get('SUPABASE_URL')
|
|
const supabaseServiceKey = Deno.env.get('SUPABASE_SERVICE_ROLE_KEY')
|
|
|
|
if (!apiKey || !supabaseUrl || !supabaseServiceKey) {
|
|
console.error('Missing required environment variables')
|
|
return new Response(
|
|
JSON.stringify({ error: 'Server configuration error' }),
|
|
{ status: 500, headers: { 'Content-Type': 'application/json' } }
|
|
)
|
|
}
|
|
|
|
const authHeader = req.headers.get('Authorization')
|
|
if (!authHeader?.startsWith('Bearer ')) {
|
|
return new Response(
|
|
JSON.stringify({ error: 'Unauthorized' }),
|
|
{ status: 401, headers: { 'Content-Type': 'application/json' } }
|
|
)
|
|
}
|
|
|
|
// Defense in depth: verify_jwt=true already requires a valid JWT at the
|
|
// gateway layer. This adds an explicit role check so only service-role
|
|
// callers can trigger queue processing.
|
|
const token = authHeader.slice('Bearer '.length).trim()
|
|
const claims = parseJwtClaims(token)
|
|
if (claims?.role !== 'service_role') {
|
|
return new Response(
|
|
JSON.stringify({ error: 'Forbidden' }),
|
|
{ status: 403, headers: { 'Content-Type': 'application/json' } }
|
|
)
|
|
}
|
|
|
|
const supabase = createClient(supabaseUrl, supabaseServiceKey)
|
|
|
|
// 1. Check rate-limit cooldown and read queue config
|
|
const { data: state } = await supabase
|
|
.from('email_send_state')
|
|
.select('retry_after_until, batch_size, send_delay_ms, auth_email_ttl_minutes, transactional_email_ttl_minutes')
|
|
.single()
|
|
|
|
if (state?.retry_after_until && new Date(state.retry_after_until) > new Date()) {
|
|
return new Response(
|
|
JSON.stringify({ skipped: true, reason: 'rate_limited' }),
|
|
{ headers: { 'Content-Type': 'application/json' } }
|
|
)
|
|
}
|
|
|
|
const batchSize = state?.batch_size ?? DEFAULT_BATCH_SIZE
|
|
const sendDelayMs = state?.send_delay_ms ?? DEFAULT_SEND_DELAY_MS
|
|
const ttlMinutes: Record<string, number> = {
|
|
auth_emails: state?.auth_email_ttl_minutes ?? DEFAULT_AUTH_TTL_MINUTES,
|
|
transactional_emails: state?.transactional_email_ttl_minutes ?? DEFAULT_TRANSACTIONAL_TTL_MINUTES,
|
|
}
|
|
|
|
let totalProcessed = 0
|
|
|
|
// 2. Process auth_emails first (priority), then transactional_emails
|
|
for (const queue of ['auth_emails', 'transactional_emails']) {
|
|
const { data: messages, error: readError } = await supabase.rpc('read_email_batch', {
|
|
queue_name: queue,
|
|
batch_size: batchSize,
|
|
vt: 30,
|
|
})
|
|
|
|
if (readError) {
|
|
console.error('Failed to read email batch', { queue, error: readError })
|
|
continue
|
|
}
|
|
|
|
if (!messages?.length) continue
|
|
|
|
// Retry budget is based on real send failures, not pgmq read_ct.
|
|
// read_ct increments for every message in a claimed batch, including
|
|
// messages not attempted when a 429 stops processing early.
|
|
const messageIds = Array.from(
|
|
new Set(
|
|
messages
|
|
.map((msg) =>
|
|
msg?.message?.message_id && typeof msg.message.message_id === 'string'
|
|
? msg.message.message_id
|
|
: null
|
|
)
|
|
.filter((id): id is string => Boolean(id))
|
|
)
|
|
)
|
|
const failedAttemptsByMessageId = new Map<string, number>()
|
|
if (messageIds.length > 0) {
|
|
const { data: failedRows, error: failedRowsError } = await supabase
|
|
.from('email_send_log')
|
|
.select('message_id')
|
|
.in('message_id', messageIds)
|
|
.eq('status', 'failed')
|
|
|
|
if (failedRowsError) {
|
|
console.error('Failed to load failed-attempt counters', {
|
|
queue,
|
|
error: failedRowsError,
|
|
})
|
|
} else {
|
|
for (const row of failedRows ?? []) {
|
|
const messageId = row?.message_id
|
|
if (typeof messageId !== 'string' || !messageId) continue
|
|
failedAttemptsByMessageId.set(
|
|
messageId,
|
|
(failedAttemptsByMessageId.get(messageId) ?? 0) + 1
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
for (let i = 0; i < messages.length; i++) {
|
|
const msg = messages[i]
|
|
const payload = msg.message
|
|
const failedAttempts =
|
|
payload?.message_id && typeof payload.message_id === 'string'
|
|
? (failedAttemptsByMessageId.get(payload.message_id) ?? 0)
|
|
: msg.read_ct ?? 0
|
|
|
|
// Drop expired messages (TTL exceeded).
|
|
// Prefer payload.queued_at when present; fall back to PGMQ's enqueued_at
|
|
// which is always set by the queue.
|
|
const queuedAt = payload.queued_at ?? msg.enqueued_at
|
|
if (queuedAt) {
|
|
const ageMs = Date.now() - new Date(queuedAt).getTime()
|
|
const maxAgeMs = ttlMinutes[queue] * 60 * 1000
|
|
if (ageMs > maxAgeMs) {
|
|
console.warn('Email expired (TTL exceeded)', {
|
|
queue,
|
|
msg_id: msg.msg_id,
|
|
queued_at: queuedAt,
|
|
ttl_minutes: ttlMinutes[queue],
|
|
})
|
|
await moveToDlq(supabase, queue, msg, `TTL exceeded (${ttlMinutes[queue]} minutes)`)
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Move to DLQ if max failed send attempts reached.
|
|
if (failedAttempts >= MAX_RETRIES) {
|
|
await moveToDlq(supabase, queue, msg, `Max retries (${MAX_RETRIES}) exceeded (attempted ${failedAttempts} times)`)
|
|
continue
|
|
}
|
|
|
|
// Guard: skip if another worker already sent this message (VT expired race)
|
|
if (payload.message_id) {
|
|
const { data: alreadySent } = await supabase
|
|
.from('email_send_log')
|
|
.select('id')
|
|
.eq('message_id', payload.message_id)
|
|
.eq('status', 'sent')
|
|
.maybeSingle()
|
|
|
|
if (alreadySent) {
|
|
console.warn('Skipping duplicate send (already sent)', {
|
|
queue,
|
|
msg_id: msg.msg_id,
|
|
message_id: payload.message_id,
|
|
})
|
|
const { error: dupDelError } = await supabase.rpc('delete_email', {
|
|
queue_name: queue,
|
|
message_id: msg.msg_id,
|
|
})
|
|
if (dupDelError) {
|
|
console.error('Failed to delete duplicate message from queue', { queue, msg_id: msg.msg_id, error: dupDelError })
|
|
}
|
|
continue
|
|
}
|
|
}
|
|
|
|
try {
|
|
await sendLovableEmail(
|
|
{
|
|
run_id: payload.run_id,
|
|
to: payload.to,
|
|
from: payload.from,
|
|
sender_domain: payload.sender_domain,
|
|
subject: payload.subject,
|
|
html: payload.html,
|
|
text: payload.text,
|
|
purpose: payload.purpose,
|
|
label: payload.label,
|
|
idempotency_key: payload.idempotency_key,
|
|
unsubscribe_token: payload.unsubscribe_token,
|
|
message_id: payload.message_id,
|
|
},
|
|
// sendUrl is optional — when LOVABLE_SEND_URL is not set, the library
|
|
// falls back to the default Lovable API endpoint (https://api.lovable.dev).
|
|
// Set LOVABLE_SEND_URL as a Supabase secret to override (e.g. for local dev).
|
|
{ apiKey, sendUrl: Deno.env.get('LOVABLE_SEND_URL') }
|
|
)
|
|
|
|
// Log success
|
|
await supabase.from('email_send_log').insert({
|
|
message_id: payload.message_id,
|
|
template_name: payload.label || queue,
|
|
recipient_email: payload.to,
|
|
status: 'sent',
|
|
})
|
|
|
|
// Delete from queue
|
|
const { error: delError } = await supabase.rpc('delete_email', {
|
|
queue_name: queue,
|
|
message_id: msg.msg_id,
|
|
})
|
|
if (delError) {
|
|
console.error('Failed to delete sent message from queue', { queue, msg_id: msg.msg_id, error: delError })
|
|
}
|
|
totalProcessed++
|
|
} catch (error) {
|
|
const errorMsg = error instanceof Error ? error.message : String(error)
|
|
console.error('Email send failed', {
|
|
queue,
|
|
msg_id: msg.msg_id,
|
|
read_ct: msg.read_ct,
|
|
failed_attempts: failedAttempts,
|
|
error: errorMsg,
|
|
})
|
|
|
|
if (isRateLimited(error)) {
|
|
await supabase.from('email_send_log').insert({
|
|
message_id: payload.message_id,
|
|
template_name: payload.label || queue,
|
|
recipient_email: payload.to,
|
|
status: 'rate_limited',
|
|
error_message: errorMsg.slice(0, 1000),
|
|
})
|
|
|
|
const retryAfterSecs = getRetryAfterSeconds(error)
|
|
await supabase
|
|
.from('email_send_state')
|
|
.update({
|
|
retry_after_until: new Date(
|
|
Date.now() + retryAfterSecs * 1000
|
|
).toISOString(),
|
|
updated_at: new Date().toISOString(),
|
|
})
|
|
.eq('id', 1)
|
|
|
|
// Stop processing — remaining messages stay in queue (VT expires, retried next cycle)
|
|
return new Response(
|
|
JSON.stringify({ processed: totalProcessed, stopped: 'rate_limited' }),
|
|
{ headers: { 'Content-Type': 'application/json' } }
|
|
)
|
|
}
|
|
|
|
// 403s are permanent configuration or authorization failures for this
|
|
// message, so move straight to DLQ and stop processing the rest of the batch.
|
|
if (isForbidden(error)) {
|
|
await moveToDlq(supabase, queue, msg, errorMsg.slice(0, 1000))
|
|
return new Response(
|
|
JSON.stringify({ processed: totalProcessed, stopped: 'forbidden' }),
|
|
{ headers: { 'Content-Type': 'application/json' } }
|
|
)
|
|
}
|
|
|
|
// Log non-429 failures to track real retry attempts.
|
|
await supabase.from('email_send_log').insert({
|
|
message_id: payload.message_id,
|
|
template_name: payload.label || queue,
|
|
recipient_email: payload.to,
|
|
status: 'failed',
|
|
error_message: errorMsg.slice(0, 1000),
|
|
})
|
|
if (payload?.message_id && typeof payload.message_id === 'string') {
|
|
failedAttemptsByMessageId.set(payload.message_id, failedAttempts + 1)
|
|
}
|
|
|
|
// Non-429 errors: message stays invisible until VT expires, then retried
|
|
}
|
|
|
|
// Small delay between sends to smooth bursts
|
|
if (i < messages.length - 1) {
|
|
await new Promise((r) => setTimeout(r, sendDelayMs))
|
|
}
|
|
}
|
|
}
|
|
|
|
return new Response(
|
|
JSON.stringify({ processed: totalProcessed }),
|
|
{ headers: { 'Content-Type': 'application/json' } }
|
|
)
|
|
})
|