import { createClient } from 'npm:@supabase/supabase-js@2' import { sendSmtpMessage, type SmtpSender } from '../_shared/smtp-send.ts' import { sendViaHostingerMail, type HostingerMailConfig } from '../_shared/hostinger-mail.ts' // Which sender identity automated email is sent from. Must be an active row in // public.email_senders that the SMTP account is authorized to send as. Override // per-environment with the AUTOMATED_EMAIL_FROM secret. // NOTE: no-reply@avriamail.com (Office365) is the sender with currently-valid // SMTP credentials — it is the address that has actually been delivering mail. // The Hostinger mail@avriacam.com mailbox rejects its stored password (SMTP 535) // and must have its password re-entered in Email Settings before it can be used. const DEFAULT_AUTOMATED_FROM = 'no-reply@avriamail.com' // Load (and shape) the SMTP sender used for all queued automated email. async function loadAutomatedSender( supabase: ReturnType ): Promise { const fromEmail = Deno.env.get('AUTOMATED_EMAIL_FROM') || DEFAULT_AUTOMATED_FROM const { data, error } = await supabase .from('email_senders') .select('sender_name, email_address, smtp_host, smtp_port, smtp_username, smtp_password, use_tls, use_ssl') .eq('is_active', true) .eq('email_address', fromEmail) .order('verified', { ascending: false }) .order('is_default', { ascending: false }) .order('updated_at', { ascending: false }) .limit(1) .maybeSingle() if (error) throw new Error(`Failed to load SMTP sender '${fromEmail}': ${error.message}`) if (!data) throw new Error(`No active email_senders row for '${fromEmail}'`) const port = Number(data.smtp_port ?? 587) return { host: data.smtp_host as string, port, username: data.smtp_username as string, password: data.smtp_password as string, use_ssl: port === 465 ? true : Boolean(data.use_ssl), use_tls: port === 465 ? false : (data.use_tls ?? port === 587), fromEmail: data.email_address as string, fromName: (data.sender_name as string) || undefined, } } const MAX_RETRIES = 5 const DEFAULT_BATCH_SIZE = 10 const DEFAULT_SEND_DELAY_MS = 200 const DEFAULT_AUTH_TTL_MINUTES = 15 const DEFAULT_TRANSACTIONAL_TTL_MINUTES = 60 // Check if an error is a rate-limit (429) response. // Uses EmailAPIError.status when available (email-js >=0.x with structured errors), // falls back to parsing the error message for older versions. function isRateLimited(error: unknown): boolean { if (error && typeof error === 'object' && 'status' in error) { return (error as { status: number }).status === 429 } return error instanceof Error && error.message.includes('429') } // Check if an error is a forbidden (403) response. Retrying won't help. // Move straight to DLQ. function isForbidden(error: unknown): boolean { if (error && typeof error === 'object' && 'status' in error) { return (error as { status: number }).status === 403 } return error instanceof Error && error.message.includes('403') } // Extract Retry-After seconds from a structured EmailAPIError, or default to 60s. function getRetryAfterSeconds(error: unknown): number { if (error && typeof error === 'object' && 'retryAfterSeconds' in error) { return (error as { retryAfterSeconds: number | null }).retryAfterSeconds ?? 60 } return 60 } function parseJwtClaims(token: string): Record | null { const parts = token.split('.') if (parts.length < 2) { return null } try { const payload = parts[1] .replaceAll('-', '+') .replaceAll('_', '/') .padEnd(Math.ceil(parts[1].length / 4) * 4, '=') return JSON.parse(atob(payload)) as Record } catch { return null } } // Move a message to the dead letter queue and log the reason. async function moveToDlq( supabase: ReturnType, queue: string, msg: { msg_id: number; message: Record }, reason: string ): Promise { const payload = msg.message await supabase.from('email_send_log').insert({ message_id: payload.message_id, template_name: (payload.label || queue) as string, recipient_email: payload.to, status: 'dlq', error_message: reason, }) const { error } = await supabase.rpc('move_to_dlq', { source_queue: queue, dlq_name: `${queue}_dlq`, message_id: msg.msg_id, payload, }) if (error) { console.error('Failed to move message to DLQ', { queue, msg_id: msg.msg_id, reason, error }) } } Deno.serve(async (req) => { const supabaseUrl = Deno.env.get('SUPABASE_URL') const supabaseServiceKey = Deno.env.get('SUPABASE_SERVICE_ROLE_KEY') if (!supabaseUrl || !supabaseServiceKey) { console.error('Missing required environment variables') return new Response( JSON.stringify({ error: 'Server configuration error' }), { status: 500, headers: { 'Content-Type': 'application/json' } } ) } const authHeader = req.headers.get('Authorization') if (!authHeader?.startsWith('Bearer ')) { return new Response( JSON.stringify({ error: 'Unauthorized' }), { status: 401, headers: { 'Content-Type': 'application/json' } } ) } // Defense in depth: verify_jwt=true already requires a valid JWT at the // gateway layer. This adds an explicit role check so only service-role // callers can trigger queue processing. const token = authHeader.slice('Bearer '.length).trim() const claims = parseJwtClaims(token) if (claims?.role !== 'service_role') { return new Response( JSON.stringify({ error: 'Forbidden' }), { status: 403, headers: { 'Content-Type': 'application/json' } } ) } const supabase = createClient(supabaseUrl, supabaseServiceKey) // Resolve the SMTP sender once per run (shared across the whole batch). let smtpSender: SmtpSender try { smtpSender = await loadAutomatedSender(supabase) } catch (senderErr) { console.error('Cannot load automated email sender', { error: senderErr }) return new Response( JSON.stringify({ error: 'Email sender not configured', detail: String(senderErr) }), { status: 500, headers: { 'Content-Type': 'application/json' } } ) } // Base URL for the one-click List-Unsubscribe link. const unsubscribeBase = `${supabaseUrl.replace(/\/$/, '')}/functions/v1/handle-email-unsubscribe` // Preferred transport: Hostinger Email API (when configured via secrets). // Falls back to SMTP automatically on any API error, so there is no outage // while the secrets are being set or if the API has a transient failure. const hostingerToken = Deno.env.get('HOSTINGER_MAIL_API_TOKEN') const hostingerResourceId = Deno.env.get('HOSTINGER_MAIL_RESOURCE_ID') const hostinger: HostingerMailConfig | null = hostingerToken && hostingerResourceId ? { token: hostingerToken, mailboxResourceId: hostingerResourceId } : null // 1. Check rate-limit cooldown and read queue config const { data: state } = await supabase .from('email_send_state') .select('retry_after_until, batch_size, send_delay_ms, auth_email_ttl_minutes, transactional_email_ttl_minutes') .single() if (state?.retry_after_until && new Date(state.retry_after_until) > new Date()) { return new Response( JSON.stringify({ skipped: true, reason: 'rate_limited' }), { headers: { 'Content-Type': 'application/json' } } ) } const batchSize = state?.batch_size ?? DEFAULT_BATCH_SIZE const sendDelayMs = state?.send_delay_ms ?? DEFAULT_SEND_DELAY_MS const ttlMinutes: Record = { auth_emails: state?.auth_email_ttl_minutes ?? DEFAULT_AUTH_TTL_MINUTES, transactional_emails: state?.transactional_email_ttl_minutes ?? DEFAULT_TRANSACTIONAL_TTL_MINUTES, } let totalProcessed = 0 // 2. Process auth_emails first (priority), then transactional_emails for (const queue of ['auth_emails', 'transactional_emails']) { const { data: messages, error: readError } = await supabase.rpc('read_email_batch', { queue_name: queue, batch_size: batchSize, vt: 30, }) if (readError) { console.error('Failed to read email batch', { queue, error: readError }) continue } if (!messages?.length) continue // Retry budget is based on real send failures, not pgmq read_ct. // read_ct increments for every message in a claimed batch, including // messages not attempted when a 429 stops processing early. const messageIds = Array.from( new Set( messages .map((msg) => msg?.message?.message_id && typeof msg.message.message_id === 'string' ? msg.message.message_id : null ) .filter((id): id is string => Boolean(id)) ) ) const failedAttemptsByMessageId = new Map() if (messageIds.length > 0) { const { data: failedRows, error: failedRowsError } = await supabase .from('email_send_log') .select('message_id') .in('message_id', messageIds) .eq('status', 'failed') if (failedRowsError) { console.error('Failed to load failed-attempt counters', { queue, error: failedRowsError, }) } else { for (const row of failedRows ?? []) { const messageId = row?.message_id if (typeof messageId !== 'string' || !messageId) continue failedAttemptsByMessageId.set( messageId, (failedAttemptsByMessageId.get(messageId) ?? 0) + 1 ) } } } for (let i = 0; i < messages.length; i++) { const msg = messages[i] const payload = msg.message const failedAttempts = payload?.message_id && typeof payload.message_id === 'string' ? (failedAttemptsByMessageId.get(payload.message_id) ?? 0) : msg.read_ct ?? 0 // Drop expired messages (TTL exceeded). // Prefer payload.queued_at when present; fall back to PGMQ's enqueued_at // which is always set by the queue. const queuedAt = payload.queued_at ?? msg.enqueued_at if (queuedAt) { const ageMs = Date.now() - new Date(queuedAt).getTime() const maxAgeMs = ttlMinutes[queue] * 60 * 1000 if (ageMs > maxAgeMs) { console.warn('Email expired (TTL exceeded)', { queue, msg_id: msg.msg_id, queued_at: queuedAt, ttl_minutes: ttlMinutes[queue], }) await moveToDlq(supabase, queue, msg, `TTL exceeded (${ttlMinutes[queue]} minutes)`) continue } } // Move to DLQ if max failed send attempts reached. if (failedAttempts >= MAX_RETRIES) { await moveToDlq(supabase, queue, msg, `Max retries (${MAX_RETRIES}) exceeded (attempted ${failedAttempts} times)`) continue } // Guard: skip if another worker already sent this message (VT expired race) if (payload.message_id) { const { data: alreadySent } = await supabase .from('email_send_log') .select('id') .eq('message_id', payload.message_id) .eq('status', 'sent') .maybeSingle() if (alreadySent) { console.warn('Skipping duplicate send (already sent)', { queue, msg_id: msg.msg_id, message_id: payload.message_id, }) const { error: dupDelError } = await supabase.rpc('delete_email', { queue_name: queue, message_id: msg.msg_id, }) if (dupDelError) { console.error('Failed to delete duplicate message from queue', { queue, msg_id: msg.msg_id, error: dupDelError }) } continue } } try { // Preferred: Hostinger Email API. Fall back to the project's own SMTP // sender (Lovable-free) on any API error so delivery keeps flowing. let sentVia = 'smtp' const sendSmtp = () => sendSmtpMessage(smtpSender, { to: payload.to as string, subject: payload.subject as string, html: payload.html as string, text: payload.text as string | undefined, unsubscribeUrl: payload.unsubscribe_token ? `${unsubscribeBase}?token=${payload.unsubscribe_token}` : undefined, }) if (hostinger) { try { await sendViaHostingerMail(hostinger, { to: payload.to as string, subject: payload.subject as string, html: payload.html as string, text: payload.text as string | undefined, }) sentVia = 'hostinger-api' } catch (apiErr) { console.warn('Hostinger API send failed — falling back to SMTP', { msg_id: msg.msg_id, error: apiErr instanceof Error ? apiErr.message : String(apiErr), }) await sendSmtp() } } else { await sendSmtp() } // Log success await supabase.from('email_send_log').insert({ message_id: payload.message_id, template_name: payload.label || queue, recipient_email: payload.to, status: 'sent', error_message: `via:${sentVia}`, }) // Delete from queue const { error: delError } = await supabase.rpc('delete_email', { queue_name: queue, message_id: msg.msg_id, }) if (delError) { console.error('Failed to delete sent message from queue', { queue, msg_id: msg.msg_id, error: delError }) } totalProcessed++ } catch (error) { const errorMsg = error instanceof Error ? error.message : String(error) console.error('Email send failed', { queue, msg_id: msg.msg_id, read_ct: msg.read_ct, failed_attempts: failedAttempts, error: errorMsg, }) if (isRateLimited(error)) { await supabase.from('email_send_log').insert({ message_id: payload.message_id, template_name: payload.label || queue, recipient_email: payload.to, status: 'rate_limited', error_message: errorMsg.slice(0, 1000), }) const retryAfterSecs = getRetryAfterSeconds(error) await supabase .from('email_send_state') .update({ retry_after_until: new Date( Date.now() + retryAfterSecs * 1000 ).toISOString(), updated_at: new Date().toISOString(), }) .eq('id', 1) // Stop processing — remaining messages stay in queue (VT expires, retried next cycle) return new Response( JSON.stringify({ processed: totalProcessed, stopped: 'rate_limited' }), { headers: { 'Content-Type': 'application/json' } } ) } // 403s are permanent configuration or authorization failures for this // message, so move straight to DLQ and stop processing the rest of the batch. if (isForbidden(error)) { await moveToDlq(supabase, queue, msg, errorMsg.slice(0, 1000)) return new Response( JSON.stringify({ processed: totalProcessed, stopped: 'forbidden' }), { headers: { 'Content-Type': 'application/json' } } ) } // Log non-429 failures to track real retry attempts. await supabase.from('email_send_log').insert({ message_id: payload.message_id, template_name: payload.label || queue, recipient_email: payload.to, status: 'failed', error_message: errorMsg.slice(0, 1000), }) if (payload?.message_id && typeof payload.message_id === 'string') { failedAttemptsByMessageId.set(payload.message_id, failedAttempts + 1) } // Non-429 errors: message stays invisible until VT expires, then retried } // Small delay between sends to smooth bursts if (i < messages.length - 1) { await new Promise((r) => setTimeout(r, sendDelayMs)) } } } return new Response( JSON.stringify({ processed: totalProcessed }), { headers: { 'Content-Type': 'application/json' } } ) })