Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 7 additions & 14 deletions internal/admin/sqs_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,9 @@ type QueueSummary struct {
CreatedAt *time.Time `json:"created_at,omitempty"`
Attributes map[string]string `json:"attributes,omitempty"`
Counters QueueCounters `json:"counters"`
// IsDLQ is true when at least one other queue's RedrivePolicy
// resolves its deadLetterTargetArn to this queue. The SPA uses
// the flag to switch the Messages-tab framing and the Purge
// button label between "Purge messages" and "Purge DLQ".
// True when another queue's RedrivePolicy points at this one.
IsDLQ bool `json:"is_dlq"`
// DLQSources lists the names of queues whose RedrivePolicy
// points at this queue, sorted lexicographically. Empty when
// IsDLQ is false; the SPA renders these as chips on the queue
// detail page so the operator confirms what feeds the DLQ
// before purging.
// Source queue names that point at this DLQ, sorted lex.
DLQSources []string `json:"dlq_sources,omitempty"`
}

Expand Down Expand Up @@ -698,10 +691,9 @@ func writeQueuesError(w http.ResponseWriter, err error, logger *slog.Logger, r *
}
}

// writePurgeInProgress emits the 429 response shape the design doc
// §3.4 specifies: Retry-After header (rounded up to whole seconds so
// a client retrying exactly at the deadline is guaranteed to clear)
// + JSON body { code, message, retry_after_seconds }.
// writePurgeInProgress emits the 429 wire shape (Retry-After header
// + JSON body { error, message, retry_after_seconds }). Whole-second
// rounding-up so a deadline-exact retry is guaranteed to clear.
func writePurgeInProgress(w http.ResponseWriter, err *PurgeInProgressError) {
secs := int64(err.RetryAfter / time.Second)
if err.RetryAfter%time.Second != 0 {
Expand All @@ -712,10 +704,11 @@ func writePurgeInProgress(w http.ResponseWriter, err *PurgeInProgressError) {
}
w.Header().Set("Retry-After", strconv.FormatInt(secs, 10))
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.Header().Set("X-Content-Type-Options", "nosniff")
w.Header().Set("Cache-Control", "no-store")
w.WriteHeader(http.StatusTooManyRequests)
_ = json.NewEncoder(w).Encode(map[string]any{
"code": "PurgeQueueInProgress",
"error": "PurgeQueueInProgress",
"message": "only one PurgeQueue operation on each queue is allowed every 60 seconds",
"retry_after_seconds": secs,
})
Expand Down
5 changes: 4 additions & 1 deletion internal/admin/sqs_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,10 @@ func TestSqsHandler_PurgeQueue_RateLimited429(t *testing.T) {
require.Equal(t, "43", rec.Header().Get("Retry-After"))
var body map[string]any
require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &body))
require.Equal(t, "PurgeQueueInProgress", body["code"])
// The "error" key (not "code") matches writeJSONError's envelope
// so apiFetch.ts can extract the AWS-style sentinel consistently
// with every other 4xx error.
require.Equal(t, "PurgeQueueInProgress", body["error"])
require.EqualValues(t, 43, body["retry_after_seconds"])
}

Expand Down
51 changes: 51 additions & 0 deletions web/admin/src/api/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,48 @@ export interface SqsQueueSummary {
created_at?: string;
attributes?: Record<string, string>;
counters: SqsQueueCounters;
// True when another queue's RedrivePolicy points at this one.
is_dlq: boolean;
// Source queue names that point at this DLQ, sorted lex.
dlq_sources?: string[];
}

export interface SqsQueueList {
queues: string[];
}

// SqsPeekedAttribute mirrors AWS's typed MessageAttribute shape;
// binary_value arrives base64-encoded.
export interface SqsPeekedAttribute {
data_type: string;
string_value?: string;
binary_value?: string;
}

export interface SqsPeekedMessage {
message_id: string;
body: string;
body_truncated: boolean;
body_original_size: number;
sent_timestamp: string;
receive_count: number;
group_id?: string;
deduplication_id?: string;
attributes?: Record<string, SqsPeekedAttribute>;
}

export interface SqsPeekResult {
messages: SqsPeekedMessage[];
// Omitted when the walk has fully completed for this MVCC snapshot.
next_cursor?: string;
}

export interface SqsPeekOptions {
limit?: number;
cursor?: string;
body_max_bytes?: number;
}

// KeyViz wire shapes mirror internal/admin/keyviz_handler.go
// (KeyVizMatrix / KeyVizRow). Go []byte fields arrive as
// base64-encoded strings via encoding/json — keep them as `string` on
Expand Down Expand Up @@ -311,6 +347,21 @@ export const api = {
apiFetch<SqsQueueSummary>(`/sqs/queues/${encodeURIComponent(name)}`, { signal }),
deleteQueue: (name: string) =>
apiFetch<void>(`/sqs/queues/${encodeURIComponent(name)}`, { method: "DELETE" }),
// Non-destructive peek of currently-visible messages. Server clamps
// limit to [1, 100] and body_max_bytes to [256, 262144].
peekQueue: (name: string, opts?: SqsPeekOptions, signal?: AbortSignal) =>
apiFetch<SqsPeekResult>(`/sqs/queues/${encodeURIComponent(name)}/messages`, {
query: {
limit: opts?.limit,
cursor: opts?.cursor,
body_max_bytes: opts?.body_max_bytes,
},
signal,
}),
// Drains the queue's messages while leaving meta/ARN/RedrivePolicy intact.
// 60-second rate limit per queue: second purge inside the window → 429.
purgeQueue: (name: string) =>
apiFetch<void>(`/sqs/queues/${encodeURIComponent(name)}/messages`, { method: "DELETE" }),
keyVizMatrix: (params: KeyVizParams, signal?: AbortSignal) =>
apiFetch<KeyVizMatrix>("/keyviz/matrix", {
query: {
Expand Down
Loading
Loading