diff --git a/internal/admin/sqs_handler.go b/internal/admin/sqs_handler.go index 1c090d87..11ecf2c6 100644 --- a/internal/admin/sqs_handler.go +++ b/internal/admin/sqs_handler.go @@ -40,16 +40,9 @@ type QueueSummary struct { CreatedAt *time.Time `json:"created_at,omitempty"` Attributes map[string]string `json:"attributes,omitempty"` Counters QueueCounters `json:"counters"` - // IsDLQ is true when at least one other queue's RedrivePolicy - // resolves its deadLetterTargetArn to this queue. The SPA uses - // the flag to switch the Messages-tab framing and the Purge - // button label between "Purge messages" and "Purge DLQ". + // True when another queue's RedrivePolicy points at this one. IsDLQ bool `json:"is_dlq"` - // DLQSources lists the names of queues whose RedrivePolicy - // points at this queue, sorted lexicographically. Empty when - // IsDLQ is false; the SPA renders these as chips on the queue - // detail page so the operator confirms what feeds the DLQ - // before purging. + // Source queue names that point at this DLQ, sorted lex. DLQSources []string `json:"dlq_sources,omitempty"` } @@ -698,10 +691,9 @@ func writeQueuesError(w http.ResponseWriter, err error, logger *slog.Logger, r * } } -// writePurgeInProgress emits the 429 response shape the design doc -// §3.4 specifies: Retry-After header (rounded up to whole seconds so -// a client retrying exactly at the deadline is guaranteed to clear) -// + JSON body { code, message, retry_after_seconds }. +// writePurgeInProgress emits the 429 wire shape (Retry-After header +// + JSON body { error, message, retry_after_seconds }). Whole-second +// rounding-up so a deadline-exact retry is guaranteed to clear. func writePurgeInProgress(w http.ResponseWriter, err *PurgeInProgressError) { secs := int64(err.RetryAfter / time.Second) if err.RetryAfter%time.Second != 0 { @@ -712,10 +704,11 @@ func writePurgeInProgress(w http.ResponseWriter, err *PurgeInProgressError) { } w.Header().Set("Retry-After", strconv.FormatInt(secs, 10)) w.Header().Set("Content-Type", "application/json; charset=utf-8") + w.Header().Set("X-Content-Type-Options", "nosniff") w.Header().Set("Cache-Control", "no-store") w.WriteHeader(http.StatusTooManyRequests) _ = json.NewEncoder(w).Encode(map[string]any{ - "code": "PurgeQueueInProgress", + "error": "PurgeQueueInProgress", "message": "only one PurgeQueue operation on each queue is allowed every 60 seconds", "retry_after_seconds": secs, }) diff --git a/internal/admin/sqs_handler_test.go b/internal/admin/sqs_handler_test.go index f39fd01a..2309c8ad 100644 --- a/internal/admin/sqs_handler_test.go +++ b/internal/admin/sqs_handler_test.go @@ -347,7 +347,10 @@ func TestSqsHandler_PurgeQueue_RateLimited429(t *testing.T) { require.Equal(t, "43", rec.Header().Get("Retry-After")) var body map[string]any require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &body)) - require.Equal(t, "PurgeQueueInProgress", body["code"]) + // The "error" key (not "code") matches writeJSONError's envelope + // so apiFetch.ts can extract the AWS-style sentinel consistently + // with every other 4xx error. + require.Equal(t, "PurgeQueueInProgress", body["error"]) require.EqualValues(t, 43, body["retry_after_seconds"]) } diff --git a/web/admin/src/api/client.ts b/web/admin/src/api/client.ts index ad8a318a..4c248437 100644 --- a/web/admin/src/api/client.ts +++ b/web/admin/src/api/client.ts @@ -210,12 +210,48 @@ export interface SqsQueueSummary { created_at?: string; attributes?: Record; counters: SqsQueueCounters; + // True when another queue's RedrivePolicy points at this one. + is_dlq: boolean; + // Source queue names that point at this DLQ, sorted lex. + dlq_sources?: string[]; } export interface SqsQueueList { queues: string[]; } +// SqsPeekedAttribute mirrors AWS's typed MessageAttribute shape; +// binary_value arrives base64-encoded. +export interface SqsPeekedAttribute { + data_type: string; + string_value?: string; + binary_value?: string; +} + +export interface SqsPeekedMessage { + message_id: string; + body: string; + body_truncated: boolean; + body_original_size: number; + sent_timestamp: string; + receive_count: number; + group_id?: string; + deduplication_id?: string; + attributes?: Record; +} + +export interface SqsPeekResult { + messages: SqsPeekedMessage[]; + // Omitted when the walk has fully completed for this MVCC snapshot. + next_cursor?: string; +} + +export interface SqsPeekOptions { + limit?: number; + cursor?: string; + body_max_bytes?: number; +} + // KeyViz wire shapes mirror internal/admin/keyviz_handler.go // (KeyVizMatrix / KeyVizRow). Go []byte fields arrive as // base64-encoded strings via encoding/json — keep them as `string` on @@ -311,6 +347,21 @@ export const api = { apiFetch(`/sqs/queues/${encodeURIComponent(name)}`, { signal }), deleteQueue: (name: string) => apiFetch(`/sqs/queues/${encodeURIComponent(name)}`, { method: "DELETE" }), + // Non-destructive peek of currently-visible messages. Server clamps + // limit to [1, 100] and body_max_bytes to [256, 262144]. + peekQueue: (name: string, opts?: SqsPeekOptions, signal?: AbortSignal) => + apiFetch(`/sqs/queues/${encodeURIComponent(name)}/messages`, { + query: { + limit: opts?.limit, + cursor: opts?.cursor, + body_max_bytes: opts?.body_max_bytes, + }, + signal, + }), + // Drains the queue's messages while leaving meta/ARN/RedrivePolicy intact. + // 60-second rate limit per queue: second purge inside the window → 429. + purgeQueue: (name: string) => + apiFetch(`/sqs/queues/${encodeURIComponent(name)}/messages`, { method: "DELETE" }), keyVizMatrix: (params: KeyVizParams, signal?: AbortSignal) => apiFetch("/keyviz/matrix", { query: { diff --git a/web/admin/src/pages/SqsDetail.tsx b/web/admin/src/pages/SqsDetail.tsx index 2ad32c7c..73bdb56a 100644 --- a/web/admin/src/pages/SqsDetail.tsx +++ b/web/admin/src/pages/SqsDetail.tsx @@ -1,9 +1,28 @@ -import { useState } from "react"; +import { useCallback, useEffect, useRef, useState } from "react"; import { Link, useNavigate, useParams } from "react-router-dom"; -import { api } from "../api/client"; +import { + api, + type SqsPeekResult, + type SqsPeekedMessage, +} from "../api/client"; import { Modal } from "../components/Modal"; import { formatApiError, useApiQuery } from "../lib/useApi"; +// kPeekPageSize is the documented Phase 5 default the SPA sends on +// every Messages-tab fetch. The server clamps to [1, 100]; staying +// at 20 keeps the worst-case response (20 rows × 256 KiB) at 5 MiB +// well under typical network and JSON-parse budgets. +const kPeekPageSize = 20; + +// kPeekBodyMaxBytes is the eager full-body fetch size: 256 KiB matches +// SQS's hard cap on stored message size, so the detail modal renders +// directly from the row already in memory — no re-peek round-trip on +// modal open, eliminating the "row disappeared between list and +// modal" failure modes (concurrent purge, ReceiveMessage from another +// client, visibility timer started). Trade is initial fetch size +// for modal-open consistency; design doc §3.5 lays out the reasoning. +const kPeekBodyMaxBytes = 262144; + export function SqsDetailPage() { const { name = "" } = useParams<{ name: string }>(); const detail = useApiQuery((signal) => api.describeQueue(name, signal), [name]); @@ -11,14 +30,10 @@ export function SqsDetailPage() { const [deleting, setDeleting] = useState(false); const [deleteError, setDeleteError] = useState(null); const navigate = useNavigate(); - // The delete button is gated by the backend's live-role check - // (internal/admin/sqs_handler.go principalForWrite), not the JWT - // role cached in this session. A JWT minted as read_only stays - // read_only in the cookie until logout, but the operator may have - // been promoted to full in the live role store after login — so - // gating the button on session.role would hide it for users who - // are currently authorized. A read_only operator who clicks delete - // gets a 403 from the backend, surfaced in the modal's error area. + // The delete / purge buttons are gated by the backend's live-role + // check (internal/admin/sqs_handler.go principalForWrite), not the + // JWT role cached in this session. See SqsDetail's pre-Phase-5 + // comment for the rationale. const onDelete = async () => { setDeleting(true); @@ -42,6 +57,11 @@ export function SqsDetailPage() { {detail.data.is_fifo ? "FIFO" : "Standard"} )} + {detail.data?.is_dlq && ( + + DLQ + + )} {detail.data && ( + +
+ Showing {result?.messages.length ?? 0} visible message + {(result?.messages.length ?? 0) === 1 ? "" : "s"} + {inFlightCount > 0 && ( + <> ({inFlightCount} currently in-flight, not shown) + )} +
+ {error &&
{error}
} + {loading &&
Loading…
} + {!loading && result && result.messages.length === 0 && ( +
No visible messages.
+ )} + {!loading && result && result.messages.length > 0 && ( + setSelected(m)} + /> + )} +
+ + + +
+ + setSelected(null)}> + {selected && } + + + { + if (purging) return; + setConfirmPurge(false); + setPurgeName(""); + setPurgeError(null); + }} + busy={purging} + > +

{purgeConfirmCopy}

+ + setPurgeName(e.target.value)} + disabled={purging} + autoFocus + /> + {purgeError &&
{purgeError}
} +
+ + +
+
+ + ); +} + +interface MessagesTableProps { + messages: SqsPeekedMessage[]; + showGroup: boolean; + onSelect: (m: SqsPeekedMessage) => void; +} + +function MessagesTable({ messages, showGroup, onSelect }: MessagesTableProps) { + return ( + + + + + + {showGroup && } + + + + + + + {messages.map((m) => ( + onSelect(m)} + > + + + {showGroup && } + + + + + ))} + +
Message IDSentGroupRecvBodySize
+ {m.message_id.slice(0, 8)} + {new Date(m.sent_timestamp).toLocaleString()}{m.group_id ?? ""} + {m.receive_count} + {previewBody(m)}{formatBytes(m.body_original_size)}
+ ); +} + +interface MessageDetailProps { + message: SqsPeekedMessage; + queue: string; +} + +function MessageDetail({ message, queue }: MessageDetailProps) { + const [copied, setCopied] = useState(false); + const [copyError, setCopyError] = useState(null); + const onCopyJson = async () => { + // Schema version per design doc §3.5; downstream tooling pins it. + const payload = { + schema_version: 1, + queue, + exported_at: new Date().toISOString(), + message, + }; + const json = JSON.stringify(payload, null, 2); + // navigator.clipboard is secure-context-only. On insecure / + // unavailable, point the operator at the body
 in the
+    // modal rather than window.prompt (blocking modal pre-empts the
+    // error message and some browsers truncate ~350KiB payloads).
+    if (typeof navigator !== "undefined" && navigator.clipboard?.writeText) {
+      try {
+        await navigator.clipboard.writeText(json);
+        setCopied(true);
+        setCopyError(null);
+        setTimeout(() => setCopied(false), 1500);
+        return;
+      } catch (err) {
+        setCopyError(`Copy failed: ${String(err)}. Select the body text above to copy manually.`);
+        return;
+      }
+    }
+    setCopyError("Clipboard API unavailable (insecure context). Select the body text above to copy manually.");
+  };
+
+  return (
+    
+
+
Message ID
+
{message.message_id}
+
Sent
+
{new Date(message.sent_timestamp).toLocaleString()}
+
Receive count
+
{message.receive_count}
+ {message.group_id && ( + <> +
Group ID
+
{message.group_id}
+ + )} + {message.deduplication_id && ( + <> +
Deduplication ID
+
{message.deduplication_id}
+ + )} +
Original size
+
{formatBytes(message.body_original_size)}
+
+
+
+ Body + {message.body_truncated && ( + + Truncated to {formatBytes(utf8ByteLength(message.body))} of {formatBytes(message.body_original_size)} + + )} +
+
+          {message.body}
+        
+
+ {message.attributes && Object.keys(message.attributes).length > 0 && ( +
+
Attributes
+
+ {Object.entries(message.attributes).map(([k, v]) => ( +
+
{k}
+
{v.data_type}
+
+ {v.string_value ?? (v.binary_value ? `` : "")} +
+
+ ))} +
+
+ )} + {copyError &&
{copyError}
} +
+ +
+
+ ); +} + function CounterCard({ label, value }: { label: string; value: number }) { return (
@@ -145,3 +543,33 @@ function CounterCard({ label, value }: { label: string; value: number }) {
); } + +function previewBody(m: SqsPeekedMessage): string { + const max = 96; + if (m.body.length <= max) return m.body || "(empty)"; + return m.body.slice(0, max) + "…"; +} + +function formatBytes(n: number): string { + if (n < 1024) return `${n} B`; + if (n < 1024 * 1024) return `${(n / 1024).toFixed(1)} KiB`; + return `${(n / (1024 * 1024)).toFixed(1)} MiB`; +} + +// utf8ByteLength: server applies BodyMaxBytes to UTF-8 bytes, not +// UTF-16 code units; CJK/emoji bodies would otherwise under-report. +function utf8ByteLength(s: string): number { + if (typeof TextEncoder !== "undefined") { + return new TextEncoder().encode(s).byteLength; + } + return s.length; +} + +// Decoded length without actually decoding: 4 chars → 3 bytes, minus padding. +function base64DecodedByteLength(b64: string): number { + if (b64.length === 0) return 0; + let padding = 0; + if (b64.endsWith("==")) padding = 2; + else if (b64.endsWith("=")) padding = 1; + return Math.floor(b64.length * 3 / 4) - padding; +}