Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ It is designed to be simple to deploy and can run either:
| `promgithub_event_queue_capacity` | Gauge | none | Configured capacity of the webhook event queue |
| `promgithub_event_worker_count` | Gauge | none | Configured number of async webhook event workers |
| `promgithub_event_processed_total` | Counter | `event_type` | Total number of webhook events processed asynchronously |
| `promgithub_event_dropped_total` | Counter | `event_type`, `reason` | Total number of webhook events dropped before processing |
| `promgithub_event_queue_dropped_total` | Counter | `event_type` | Total number of webhook events dropped because the async processing queue was full |
| `promgithub_event_unsupported_total` | Counter | `event_type` | Total number of unsupported webhook events received by the async processor |
| `promgithub_event_processing_failures_total` | Counter | `event_type` | Total number of async webhook processing failures |
| `promgithub_event_processing_duration_seconds` | Histogram | `event_type` | Duration of async webhook event processing |
| `promgithub_duplicate_deliveries_seen_total` | Counter | `event_type` | Duplicate webhook deliveries observed |
Expand Down
14 changes: 14 additions & 0 deletions docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,23 @@ The service supports the following environment variables:
- `PROMGITHUB_REDIS_DB` (optional): Redis database number, default `0`.
- `PROMGITHUB_REDIS_KEY_PREFIX` (optional): Prefix used for Redis keys, default `promgithub`.
- `PROMGITHUB_REDIS_DELIVERY_TTL` (optional): TTL for webhook delivery dedupe keys, default `24h`.
- `PROMGITHUB_EVENT_WORKERS` (optional): Number of async webhook processing workers, default `4`.
- `PROMGITHUB_EVENT_QUEUE_SIZE` (optional): Bounded async webhook queue size, default `256`.

If Redis is configured, the service stores delivery and run state in Redis.

### Async processing and backpressure

Webhook requests are acknowledged after signature validation, duplicate-delivery recording, and enqueueing into the bounded async processor.

- Accepted events return `202 Accepted` and are processed by background workers.
- Duplicate deliveries return `200 OK` and do not enqueue duplicate work.
- If the queue is full, the request returns `503 Service Unavailable` and increments `promgithub_event_queue_dropped_total{event_type="<event>"}`.
- Processing panics are recovered, logged, and exposed via `promgithub_event_processing_failures_total`; workers continue handling later events.
- On graceful termination, the processor stops accepting new events and drains accepted in-flight/queued events before exit.

Watch `promgithub_event_queue_depth`, `promgithub_event_queue_capacity`, `promgithub_event_worker_count`, `promgithub_event_processed_total`, `promgithub_event_queue_dropped_total`, `promgithub_event_unsupported_total`, and `promgithub_event_processing_failures_total` to tune worker and queue settings.

## Running the service

### Run the binary
Expand Down
4 changes: 2 additions & 2 deletions src/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (p *asyncEventProcessor) Enqueue(ctx context.Context, eventType string, bod
asyncQueueDepthGauge.Set(float64(len(p.queue)))
return nil
default:
asyncEventsDroppedCounter.WithLabelValues(eventType, "queue_full").Inc()
asyncQueueDroppedCounter.WithLabelValues(eventType).Inc()
asyncQueueDepthGauge.Set(float64(len(p.queue)))
return fmt.Errorf("event queue is full")
}
Expand All @@ -117,7 +117,7 @@ func (p *asyncEventProcessor) runWorker(workerID int) {

processor, ok := p.processFn[event.eventType]
if !ok {
asyncEventsDroppedCounter.WithLabelValues(event.eventType, "unsupported_event").Inc()
asyncUnsupportedEventsCounter.WithLabelValues(event.eventType).Inc()
continue
}

Expand Down
7 changes: 4 additions & 3 deletions src/async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (

func TestAsyncProcessorEnqueueAndProcess(t *testing.T) {
asyncProcessedEventsCounter.Reset()
asyncEventsDroppedCounter.Reset()
asyncQueueDroppedCounter.Reset()
asyncUnsupportedEventsCounter.Reset()
asyncProcessingFailuresCounter.Reset()
asyncQueueDepthGauge.Set(0)
asyncQueueCapacityGauge.Set(0)
Expand Down Expand Up @@ -51,7 +52,7 @@ func TestAsyncProcessorEnqueueAndProcess(t *testing.T) {
}

func TestAsyncProcessorDropsWhenQueueFull(t *testing.T) {
asyncEventsDroppedCounter.Reset()
asyncQueueDroppedCounter.Reset()
asyncQueueDepthGauge.Set(0)

blocker := make(chan struct{})
Expand All @@ -72,7 +73,7 @@ func TestAsyncProcessorDropsWhenQueueFull(t *testing.T) {
t.Fatal("expected queue full error")
}

if got := testutil.ToFloat64(asyncEventsDroppedCounter.WithLabelValues("workflow_run", "queue_full")); got != 1 {
if got := testutil.ToFloat64(asyncQueueDroppedCounter.WithLabelValues("workflow_run")); got != 1 {
t.Fatalf("expected dropped counter to be 1, got %v", got)
}
}
3 changes: 2 additions & 1 deletion src/github_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ func resetWebhookTestState() {
commitPushedCounter.Reset()
pullRequestCounter.Reset()
asyncProcessedEventsCounter.Reset()
asyncEventsDroppedCounter.Reset()
asyncQueueDroppedCounter.Reset()
asyncUnsupportedEventsCounter.Reset()
asyncProcessingFailuresCounter.Reset()
asyncProcessingDurationHistogram.Reset()
duplicateDeliveriesSeenCounter.Reset()
Expand Down
152 changes: 148 additions & 4 deletions src/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package main

import (
"bytes"
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
Expand All @@ -13,6 +14,7 @@ import (
"net/http/httptest"
"os"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -120,10 +122,13 @@ func TestIntegrationWebhookUnsupportedEvent(t *testing.T) {
t.Fatalf("expected status %d, got %d", http.StatusAccepted, resp.StatusCode)
}

metrics := mustFetchMetrics(t, server.URL)
metrics := waitForMetricsSubstring(t, server.URL, `promgithub_event_unsupported_total{event_type="unknown_event"} 1`)
if strings.Contains(metrics, `promgithub_workflow_status{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed"} 1`) {
t.Fatalf("unsupported event unexpectedly updated workflow metrics:\n%s", metrics)
}
if !strings.Contains(metrics, `promgithub_event_unsupported_total{event_type="unknown_event"} 1`) {
t.Fatalf("expected unsupported event metric, got:\n%s", metrics)
}
}

func TestIntegrationHealthAndMetricsEndpoints(t *testing.T) {
Expand Down Expand Up @@ -183,16 +188,144 @@ func TestIntegrationDuplicateDeliveryDoesNotInflateMetrics(t *testing.T) {
}
}

func TestIntegrationAsyncQueueFullReturnsUnavailableAndExposesQueueDropMetrics(t *testing.T) {
server := newIntegrationTestServerWithAsyncConfig(t, asyncProcessorConfig{WorkerCount: 1, QueueSize: 1})
defer server.Close()

started := make(chan struct{})
unblock := make(chan struct{})
eventProcessor.processFn["workflow_run"] = func(_ context.Context, _ []byte) {
select {
case <-started:
default:
close(started)
}
<-unblock
}
t.Cleanup(func() {
select {
case <-unblock:
default:
close(unblock)
}
})

body := mustReadFixture(t, "workflow_run.json")
first := sendWebhookRequest(t, server.URL, "workflow_run", body, "delivery-queue-full-1")
assertResponseStatus(t, first, http.StatusAccepted)

select {
case <-started:
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for first async event to start processing")
}

second := sendWebhookRequest(t, server.URL, "workflow_run", body, "delivery-queue-full-2")
assertResponseStatus(t, second, http.StatusAccepted)

third := sendWebhookRequest(t, server.URL, "workflow_run", body, "delivery-queue-full-3")
assertResponseStatus(t, third, http.StatusServiceUnavailable)

metrics := waitForMetricsSubstring(t, server.URL, `promgithub_event_queue_dropped_total{event_type="workflow_run"} 1`)
if !strings.Contains(metrics, `promgithub_event_queue_dropped_total{event_type="workflow_run"} 1`) {
t.Fatalf("expected queue-full drop metric, got:\n%s", metrics)
}
}

func TestIntegrationAsyncProcessingFailureIsVisibleAndWorkerContinues(t *testing.T) {
server := newIntegrationTestServer(t)
defer server.Close()

var attempts atomic.Int32
eventProcessor.processFn["workflow_run"] = func(ctx context.Context, body []byte) {
if attempts.Add(1) == 1 {
panic("synthetic async processor failure")
}

updateWorkflowMetrics(ctx, body)
}

body := mustReadFixture(t, "workflow_run.json")
first := sendWebhookRequest(t, server.URL, "workflow_run", body, "delivery-failure-1")
assertResponseStatus(t, first, http.StatusAccepted)

metrics := waitForMetricsSubstring(t, server.URL, `promgithub_event_processing_failures_total{event_type="workflow_run"} 1`)
if !strings.Contains(metrics, `promgithub_event_processing_failures_total{event_type="workflow_run"} 1`) {
t.Fatalf("expected async processing failure metric, got:\n%s", metrics)
}

second := sendWebhookRequest(t, server.URL, "workflow_run", body, "delivery-failure-2")
assertResponseStatus(t, second, http.StatusAccepted)

metrics = waitForMetricsSubstring(t, server.URL, `promgithub_workflow_status{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed"} 1`)
if !strings.Contains(metrics, `promgithub_event_processed_total{event_type="workflow_run"} 1`) {
t.Fatalf("expected worker to continue and process the second event, got:\n%s", metrics)
}
}

func TestIntegrationAsyncShutdownDrainsQueuedEvents(t *testing.T) {
server := newIntegrationTestServerWithAsyncConfig(t, asyncProcessorConfig{WorkerCount: 1, QueueSize: 2})
defer server.Close()

started := make(chan struct{})
unblock := make(chan struct{})
eventProcessor.processFn["workflow_run"] = func(ctx context.Context, body []byte) {
select {
case <-started:
default:
close(started)
}
<-unblock
updateWorkflowMetrics(ctx, body)
}
t.Cleanup(func() {
select {
case <-unblock:
default:
close(unblock)
}
})

body := mustReadFixture(t, "workflow_run.json")
first := sendWebhookRequest(t, server.URL, "workflow_run", body, "delivery-shutdown-1")
assertResponseStatus(t, first, http.StatusAccepted)

select {
case <-started:
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for first async event to start processing")
}

second := sendWebhookRequest(t, server.URL, "workflow_run", body, "delivery-shutdown-2")
assertResponseStatus(t, second, http.StatusAccepted)

close(unblock)
eventProcessor.Stop()
eventProcessor = nil

metrics := mustFetchMetrics(t, server.URL)
if !strings.Contains(metrics, `promgithub_event_processed_total{event_type="workflow_run"} 2`) {
t.Fatalf("expected shutdown to drain both accepted events, got:\n%s", metrics)
}
}

func newIntegrationTestServer(t *testing.T) *httptest.Server {
t.Helper()
return newIntegrationTestServerWithAsyncConfig(t, asyncProcessorConfig{WorkerCount: 1, QueueSize: 8})
}

func newIntegrationTestServerWithAsyncConfig(t *testing.T, cfg asyncProcessorConfig) *httptest.Server {
t.Helper()
resetIntegrationTestMetrics()

githubWebhookSecret = []byte("integration-test-secret")
stateStore = newInMemoryStateStore()
eventProcessor = newAsyncEventProcessor(asyncProcessorConfig{WorkerCount: 1, QueueSize: 8}, zap.NewNop())
eventProcessor = newAsyncEventProcessor(cfg, zap.NewNop())
eventProcessor.Start()
t.Cleanup(func() {
eventProcessor.Stop()
if eventProcessor != nil {
eventProcessor.Stop()
}
eventProcessor = nil
stateStore = nil
})
Expand All @@ -201,6 +334,16 @@ func newIntegrationTestServer(t *testing.T) *httptest.Server {
return httptest.NewServer(router)
}

func assertResponseStatus(t *testing.T, resp *http.Response, expectedStatus int) {
t.Helper()
defer func() { _ = resp.Body.Close() }()

if resp.StatusCode != expectedStatus {
body, _ := io.ReadAll(resp.Body)
t.Fatalf("expected status %d, got %d with body %q", expectedStatus, resp.StatusCode, strings.TrimSpace(string(body)))
}
}

func resetIntegrationTestMetrics() {
workflowStatusCounter.Reset()
workflowDurationHistogram.Reset()
Expand All @@ -215,7 +358,8 @@ func resetIntegrationTestMetrics() {
commitPushedCounter.Reset()
pullRequestCounter.Reset()
asyncProcessedEventsCounter.Reset()
asyncEventsDroppedCounter.Reset()
asyncQueueDroppedCounter.Reset()
asyncUnsupportedEventsCounter.Reset()
asyncProcessingFailuresCounter.Reset()
asyncProcessingDurationHistogram.Reset()
duplicateDeliveriesSeenCounter.Reset()
Expand Down
16 changes: 12 additions & 4 deletions src/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,20 @@ var (
[]string{"event_type"},
)

asyncEventsDroppedCounter = promauto.NewCounterVec(
asyncQueueDroppedCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "promgithub_event_dropped_total",
Help: "Total number of webhook events dropped before processing",
Name: "promgithub_event_queue_dropped_total",
Help: "Total number of webhook events dropped because the async processing queue was full",
},
[]string{"event_type", "reason"},
[]string{"event_type"},
)

asyncUnsupportedEventsCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "promgithub_event_unsupported_total",
Help: "Total number of unsupported webhook events received by the async processor",
},
[]string{"event_type"},
)

asyncProcessingFailuresCounter = promauto.NewCounterVec(
Expand Down
6 changes: 2 additions & 4 deletions src/redis_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,8 @@ func TestRedisIntegrationDuplicateDeliverySharedAcrossServers(t *testing.T) {
}
_ = resp.Body.Close()

metrics := waitForMetricsSubstring(t, serverA.URL, `promgithub_duplicate_deliveries_seen_total{event_type="workflow_run"} 1`)
if !strings.Contains(metrics, `promgithub_workflow_status{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed"} 1`) {
t.Fatalf("expected workflow metric to be recorded once, got:\n%s", metrics)
}
waitForMetricsSubstring(t, serverA.URL, `promgithub_duplicate_deliveries_seen_total{event_type="workflow_run"} 1`)
metrics := waitForMetricsSubstring(t, serverA.URL, `promgithub_workflow_status{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed"} 1`)
if !strings.Contains(metrics, `promgithub_duplicate_deliveries_dropped_total{event_type="workflow_run"} 1`) {
t.Fatalf("expected duplicate delivery to be dropped by shared Redis state, got:\n%s", metrics)
}
Expand Down
Loading