diff --git a/README.md b/README.md index b14491b..00ea37d 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,8 @@ It is designed to be simple to deploy and can run either: | `promgithub_event_queue_capacity` | Gauge | none | Configured capacity of the webhook event queue | | `promgithub_event_worker_count` | Gauge | none | Configured number of async webhook event workers | | `promgithub_event_processed_total` | Counter | `event_type` | Total number of webhook events processed asynchronously | -| `promgithub_event_dropped_total` | Counter | `event_type`, `reason` | Total number of webhook events dropped before processing | +| `promgithub_event_queue_dropped_total` | Counter | `event_type` | Total number of webhook events dropped because the async processing queue was full | +| `promgithub_event_unsupported_total` | Counter | `event_type` | Total number of unsupported webhook events received by the async processor | | `promgithub_event_processing_failures_total` | Counter | `event_type` | Total number of async webhook processing failures | | `promgithub_event_processing_duration_seconds` | Histogram | `event_type` | Duration of async webhook event processing | | `promgithub_duplicate_deliveries_seen_total` | Counter | `event_type` | Duplicate webhook deliveries observed | diff --git a/docs/usage.md b/docs/usage.md index f82e426..fa5bc73 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -21,9 +21,23 @@ The service supports the following environment variables: - `PROMGITHUB_REDIS_DB` (optional): Redis database number, default `0`. - `PROMGITHUB_REDIS_KEY_PREFIX` (optional): Prefix used for Redis keys, default `promgithub`. - `PROMGITHUB_REDIS_DELIVERY_TTL` (optional): TTL for webhook delivery dedupe keys, default `24h`. +- `PROMGITHUB_EVENT_WORKERS` (optional): Number of async webhook processing workers, default `4`. +- `PROMGITHUB_EVENT_QUEUE_SIZE` (optional): Bounded async webhook queue size, default `256`. If Redis is configured, the service stores delivery and run state in Redis. +### Async processing and backpressure + +Webhook requests are acknowledged after signature validation, duplicate-delivery recording, and enqueueing into the bounded async processor. + +- Accepted events return `202 Accepted` and are processed by background workers. +- Duplicate deliveries return `200 OK` and do not enqueue duplicate work. +- If the queue is full, the request returns `503 Service Unavailable` and increments `promgithub_event_queue_dropped_total{event_type=""}`. +- Processing panics are recovered, logged, and exposed via `promgithub_event_processing_failures_total`; workers continue handling later events. +- On graceful termination, the processor stops accepting new events and drains accepted in-flight/queued events before exit. + +Watch `promgithub_event_queue_depth`, `promgithub_event_queue_capacity`, `promgithub_event_worker_count`, `promgithub_event_processed_total`, `promgithub_event_queue_dropped_total`, `promgithub_event_unsupported_total`, and `promgithub_event_processing_failures_total` to tune worker and queue settings. + ## Running the service ### Run the binary diff --git a/src/async.go b/src/async.go index bf26f0d..7d4871f 100644 --- a/src/async.go +++ b/src/async.go @@ -102,7 +102,7 @@ func (p *asyncEventProcessor) Enqueue(ctx context.Context, eventType string, bod asyncQueueDepthGauge.Set(float64(len(p.queue))) return nil default: - asyncEventsDroppedCounter.WithLabelValues(eventType, "queue_full").Inc() + asyncQueueDroppedCounter.WithLabelValues(eventType).Inc() asyncQueueDepthGauge.Set(float64(len(p.queue))) return fmt.Errorf("event queue is full") } @@ -117,7 +117,7 @@ func (p *asyncEventProcessor) runWorker(workerID int) { processor, ok := p.processFn[event.eventType] if !ok { - asyncEventsDroppedCounter.WithLabelValues(event.eventType, "unsupported_event").Inc() + asyncUnsupportedEventsCounter.WithLabelValues(event.eventType).Inc() continue } diff --git a/src/async_test.go b/src/async_test.go index fec0843..17162ea 100644 --- a/src/async_test.go +++ b/src/async_test.go @@ -13,7 +13,8 @@ import ( func TestAsyncProcessorEnqueueAndProcess(t *testing.T) { asyncProcessedEventsCounter.Reset() - asyncEventsDroppedCounter.Reset() + asyncQueueDroppedCounter.Reset() + asyncUnsupportedEventsCounter.Reset() asyncProcessingFailuresCounter.Reset() asyncQueueDepthGauge.Set(0) asyncQueueCapacityGauge.Set(0) @@ -51,7 +52,7 @@ func TestAsyncProcessorEnqueueAndProcess(t *testing.T) { } func TestAsyncProcessorDropsWhenQueueFull(t *testing.T) { - asyncEventsDroppedCounter.Reset() + asyncQueueDroppedCounter.Reset() asyncQueueDepthGauge.Set(0) blocker := make(chan struct{}) @@ -72,7 +73,7 @@ func TestAsyncProcessorDropsWhenQueueFull(t *testing.T) { t.Fatal("expected queue full error") } - if got := testutil.ToFloat64(asyncEventsDroppedCounter.WithLabelValues("workflow_run", "queue_full")); got != 1 { + if got := testutil.ToFloat64(asyncQueueDroppedCounter.WithLabelValues("workflow_run")); got != 1 { t.Fatalf("expected dropped counter to be 1, got %v", got) } } diff --git a/src/github_test.go b/src/github_test.go index 6ef88d9..6f681bd 100644 --- a/src/github_test.go +++ b/src/github_test.go @@ -41,7 +41,8 @@ func resetWebhookTestState() { commitPushedCounter.Reset() pullRequestCounter.Reset() asyncProcessedEventsCounter.Reset() - asyncEventsDroppedCounter.Reset() + asyncQueueDroppedCounter.Reset() + asyncUnsupportedEventsCounter.Reset() asyncProcessingFailuresCounter.Reset() asyncProcessingDurationHistogram.Reset() duplicateDeliveriesSeenCounter.Reset() diff --git a/src/integration_test.go b/src/integration_test.go index 24e933c..d90cdd0 100644 --- a/src/integration_test.go +++ b/src/integration_test.go @@ -4,6 +4,7 @@ package main import ( "bytes" + "context" "crypto/hmac" "crypto/sha256" "encoding/hex" @@ -13,6 +14,7 @@ import ( "net/http/httptest" "os" "strings" + "sync/atomic" "testing" "time" @@ -120,10 +122,13 @@ func TestIntegrationWebhookUnsupportedEvent(t *testing.T) { t.Fatalf("expected status %d, got %d", http.StatusAccepted, resp.StatusCode) } - metrics := mustFetchMetrics(t, server.URL) + metrics := waitForMetricsSubstring(t, server.URL, `promgithub_event_unsupported_total{event_type="unknown_event"} 1`) if strings.Contains(metrics, `promgithub_workflow_status{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed"} 1`) { t.Fatalf("unsupported event unexpectedly updated workflow metrics:\n%s", metrics) } + if !strings.Contains(metrics, `promgithub_event_unsupported_total{event_type="unknown_event"} 1`) { + t.Fatalf("expected unsupported event metric, got:\n%s", metrics) + } } func TestIntegrationHealthAndMetricsEndpoints(t *testing.T) { @@ -183,16 +188,144 @@ func TestIntegrationDuplicateDeliveryDoesNotInflateMetrics(t *testing.T) { } } +func TestIntegrationAsyncQueueFullReturnsUnavailableAndExposesQueueDropMetrics(t *testing.T) { + server := newIntegrationTestServerWithAsyncConfig(t, asyncProcessorConfig{WorkerCount: 1, QueueSize: 1}) + defer server.Close() + + started := make(chan struct{}) + unblock := make(chan struct{}) + eventProcessor.processFn["workflow_run"] = func(_ context.Context, _ []byte) { + select { + case <-started: + default: + close(started) + } + <-unblock + } + t.Cleanup(func() { + select { + case <-unblock: + default: + close(unblock) + } + }) + + body := mustReadFixture(t, "workflow_run.json") + first := sendWebhookRequest(t, server.URL, "workflow_run", body, "delivery-queue-full-1") + assertResponseStatus(t, first, http.StatusAccepted) + + select { + case <-started: + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for first async event to start processing") + } + + second := sendWebhookRequest(t, server.URL, "workflow_run", body, "delivery-queue-full-2") + assertResponseStatus(t, second, http.StatusAccepted) + + third := sendWebhookRequest(t, server.URL, "workflow_run", body, "delivery-queue-full-3") + assertResponseStatus(t, third, http.StatusServiceUnavailable) + + metrics := waitForMetricsSubstring(t, server.URL, `promgithub_event_queue_dropped_total{event_type="workflow_run"} 1`) + if !strings.Contains(metrics, `promgithub_event_queue_dropped_total{event_type="workflow_run"} 1`) { + t.Fatalf("expected queue-full drop metric, got:\n%s", metrics) + } +} + +func TestIntegrationAsyncProcessingFailureIsVisibleAndWorkerContinues(t *testing.T) { + server := newIntegrationTestServer(t) + defer server.Close() + + var attempts atomic.Int32 + eventProcessor.processFn["workflow_run"] = func(ctx context.Context, body []byte) { + if attempts.Add(1) == 1 { + panic("synthetic async processor failure") + } + + updateWorkflowMetrics(ctx, body) + } + + body := mustReadFixture(t, "workflow_run.json") + first := sendWebhookRequest(t, server.URL, "workflow_run", body, "delivery-failure-1") + assertResponseStatus(t, first, http.StatusAccepted) + + metrics := waitForMetricsSubstring(t, server.URL, `promgithub_event_processing_failures_total{event_type="workflow_run"} 1`) + if !strings.Contains(metrics, `promgithub_event_processing_failures_total{event_type="workflow_run"} 1`) { + t.Fatalf("expected async processing failure metric, got:\n%s", metrics) + } + + second := sendWebhookRequest(t, server.URL, "workflow_run", body, "delivery-failure-2") + assertResponseStatus(t, second, http.StatusAccepted) + + metrics = waitForMetricsSubstring(t, server.URL, `promgithub_workflow_status{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed"} 1`) + if !strings.Contains(metrics, `promgithub_event_processed_total{event_type="workflow_run"} 1`) { + t.Fatalf("expected worker to continue and process the second event, got:\n%s", metrics) + } +} + +func TestIntegrationAsyncShutdownDrainsQueuedEvents(t *testing.T) { + server := newIntegrationTestServerWithAsyncConfig(t, asyncProcessorConfig{WorkerCount: 1, QueueSize: 2}) + defer server.Close() + + started := make(chan struct{}) + unblock := make(chan struct{}) + eventProcessor.processFn["workflow_run"] = func(ctx context.Context, body []byte) { + select { + case <-started: + default: + close(started) + } + <-unblock + updateWorkflowMetrics(ctx, body) + } + t.Cleanup(func() { + select { + case <-unblock: + default: + close(unblock) + } + }) + + body := mustReadFixture(t, "workflow_run.json") + first := sendWebhookRequest(t, server.URL, "workflow_run", body, "delivery-shutdown-1") + assertResponseStatus(t, first, http.StatusAccepted) + + select { + case <-started: + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for first async event to start processing") + } + + second := sendWebhookRequest(t, server.URL, "workflow_run", body, "delivery-shutdown-2") + assertResponseStatus(t, second, http.StatusAccepted) + + close(unblock) + eventProcessor.Stop() + eventProcessor = nil + + metrics := mustFetchMetrics(t, server.URL) + if !strings.Contains(metrics, `promgithub_event_processed_total{event_type="workflow_run"} 2`) { + t.Fatalf("expected shutdown to drain both accepted events, got:\n%s", metrics) + } +} + func newIntegrationTestServer(t *testing.T) *httptest.Server { + t.Helper() + return newIntegrationTestServerWithAsyncConfig(t, asyncProcessorConfig{WorkerCount: 1, QueueSize: 8}) +} + +func newIntegrationTestServerWithAsyncConfig(t *testing.T, cfg asyncProcessorConfig) *httptest.Server { t.Helper() resetIntegrationTestMetrics() githubWebhookSecret = []byte("integration-test-secret") stateStore = newInMemoryStateStore() - eventProcessor = newAsyncEventProcessor(asyncProcessorConfig{WorkerCount: 1, QueueSize: 8}, zap.NewNop()) + eventProcessor = newAsyncEventProcessor(cfg, zap.NewNop()) eventProcessor.Start() t.Cleanup(func() { - eventProcessor.Stop() + if eventProcessor != nil { + eventProcessor.Stop() + } eventProcessor = nil stateStore = nil }) @@ -201,6 +334,16 @@ func newIntegrationTestServer(t *testing.T) *httptest.Server { return httptest.NewServer(router) } +func assertResponseStatus(t *testing.T, resp *http.Response, expectedStatus int) { + t.Helper() + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode != expectedStatus { + body, _ := io.ReadAll(resp.Body) + t.Fatalf("expected status %d, got %d with body %q", expectedStatus, resp.StatusCode, strings.TrimSpace(string(body))) + } +} + func resetIntegrationTestMetrics() { workflowStatusCounter.Reset() workflowDurationHistogram.Reset() @@ -215,7 +358,8 @@ func resetIntegrationTestMetrics() { commitPushedCounter.Reset() pullRequestCounter.Reset() asyncProcessedEventsCounter.Reset() - asyncEventsDroppedCounter.Reset() + asyncQueueDroppedCounter.Reset() + asyncUnsupportedEventsCounter.Reset() asyncProcessingFailuresCounter.Reset() asyncProcessingDurationHistogram.Reset() duplicateDeliveriesSeenCounter.Reset() diff --git a/src/metrics.go b/src/metrics.go index 1dc91bd..d222fff 100644 --- a/src/metrics.go +++ b/src/metrics.go @@ -135,12 +135,20 @@ var ( []string{"event_type"}, ) - asyncEventsDroppedCounter = promauto.NewCounterVec( + asyncQueueDroppedCounter = promauto.NewCounterVec( prometheus.CounterOpts{ - Name: "promgithub_event_dropped_total", - Help: "Total number of webhook events dropped before processing", + Name: "promgithub_event_queue_dropped_total", + Help: "Total number of webhook events dropped because the async processing queue was full", }, - []string{"event_type", "reason"}, + []string{"event_type"}, + ) + + asyncUnsupportedEventsCounter = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "promgithub_event_unsupported_total", + Help: "Total number of unsupported webhook events received by the async processor", + }, + []string{"event_type"}, ) asyncProcessingFailuresCounter = promauto.NewCounterVec( diff --git a/src/redis_integration_test.go b/src/redis_integration_test.go index 403ba0a..9a5aa0a 100644 --- a/src/redis_integration_test.go +++ b/src/redis_integration_test.go @@ -39,10 +39,8 @@ func TestRedisIntegrationDuplicateDeliverySharedAcrossServers(t *testing.T) { } _ = resp.Body.Close() - metrics := waitForMetricsSubstring(t, serverA.URL, `promgithub_duplicate_deliveries_seen_total{event_type="workflow_run"} 1`) - if !strings.Contains(metrics, `promgithub_workflow_status{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed"} 1`) { - t.Fatalf("expected workflow metric to be recorded once, got:\n%s", metrics) - } + waitForMetricsSubstring(t, serverA.URL, `promgithub_duplicate_deliveries_seen_total{event_type="workflow_run"} 1`) + metrics := waitForMetricsSubstring(t, serverA.URL, `promgithub_workflow_status{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed"} 1`) if !strings.Contains(metrics, `promgithub_duplicate_deliveries_dropped_total{event_type="workflow_run"} 1`) { t.Fatalf("expected duplicate delivery to be dropped by shared Redis state, got:\n%s", metrics) }