-
Notifications
You must be signed in to change notification settings - Fork 2
feat(encryption): Stage 6D-3 — CapabilityFanout helper (Voters ∪ Learners) #793
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
862e2fd
b46d1f5
f2bbdaa
438482f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,352 @@ | ||
| package admin | ||
|
|
||
| import ( | ||
| "context" | ||
| "errors" | ||
| "strconv" | ||
| "sync" | ||
| "time" | ||
|
|
||
| pb "github.com/bootjp/elastickv/proto" | ||
| pkgerrors "github.com/cockroachdb/errors" | ||
| ) | ||
|
|
||
| // errCapabilityFanoutBadInput is the sentinel wrapped by every | ||
| // input-validation refusal in this file so callers can errors.Is() | ||
| // against it without parsing strings. The concrete failure adds a | ||
| // %w-wrapped detail. | ||
| var errCapabilityFanoutBadInput = errors.New("capability fan-out: bad input") | ||
|
|
||
| // errCapabilityFanoutMalformedMember is the sentinel set on the | ||
| // Err field of verdicts whose RouteMember rows arrived malformed | ||
| // (missing Address) from the caller's route-snapshot construction. | ||
| // The verdict still appears in Result.Verdicts so the cutover RPC | ||
| // fails closed (OK=false) and the operator can name the | ||
| // misconfigured member, rather than silently dropping the row and | ||
| // letting OK=true sneak through against an unprobed peer. | ||
| var errCapabilityFanoutMalformedMember = errors.New("capability fan-out: malformed route member") | ||
|
|
||
| // errCapabilityFanoutBadDialer is the sentinel for a DialFunc that | ||
| // violates its contract — returns no error but also no client | ||
| // (nil, nil, nil). Without this guard the goroutine would panic on | ||
| // client.GetCapability(...) and take down the admin RPC path | ||
| // instead of producing a Reachable=false verdict. | ||
| var errCapabilityFanoutBadDialer = errors.New("capability fan-out: dialer returned nil client without error") | ||
|
|
||
| // errCapabilityFanoutMismatchedResponder is the sentinel for the | ||
| // stale-routing / shared-address case: the snapshot expected one | ||
| // full_node_id at an address but the responder reports a different | ||
| // one. Accepting the response would credit the expected member as | ||
| // verified even though the intended member never actually answered. | ||
| // Fail-closed: verdict carries this error with Reachable=false. | ||
| var errCapabilityFanoutMismatchedResponder = errors.New("capability fan-out: responder full_node_id does not match expected") | ||
|
|
||
| // errCapabilityFanoutProbeTimeout is the sentinel pre-seeded on | ||
| // every verdict slot before the probe goroutines start. A buggy | ||
| // DialFunc/client that ignores ctx cancellation would otherwise | ||
| // hang the helper indefinitely; by pre-seeding the slot and | ||
| // returning whichever slots have been overwritten by the deadline, | ||
| // the helper honors its §4.3 "Returns within timeout" contract | ||
| // even against an uncooperative dialer. | ||
| var errCapabilityFanoutProbeTimeout = errors.New("capability fan-out: probe did not complete within timeout") | ||
|
|
||
| // RouteMember is one peer entry in a Raft group. The fan-out helper | ||
| // dials Address and identifies the node by FullNodeID for dedup | ||
| // across groups (a node serving multiple groups is probed once). | ||
| type RouteMember struct { | ||
| FullNodeID uint64 | ||
| Address string | ||
| } | ||
|
|
||
| // RouteGroup is one Raft group's membership. Voters and Learners are | ||
| // kept separate at the input level so the cutover RPC handler can | ||
| // log them distinctly, but CapabilityFanout treats them identically | ||
| // per the §4.1 contract "every (voter ∪ learner) of every Raft | ||
| // group". The 6D design pins that learner unreachability is a hard | ||
| // no the same way voter unreachability is — see §8 / row "One | ||
| // learner unreachable during fan-out". | ||
| type RouteGroup struct { | ||
| GroupID uint64 | ||
| Voters []RouteMember | ||
| Learners []RouteMember | ||
| } | ||
|
|
||
| // RouteSnapshot is the input the cutover RPC handler builds from | ||
| // the Raft engine's membership view and passes to CapabilityFanout. | ||
| // Independent of the route-catalog `distribution.CatalogSnapshot`, | ||
| // which only carries shard→group mappings, not per-group membership. | ||
| type RouteSnapshot struct { | ||
| Groups []RouteGroup | ||
| } | ||
|
|
||
| // CapabilityVerdict is one node's per-call outcome. Reachable=false | ||
| // means the dial or the RPC timed out / failed at transport level — | ||
| // not a "no" answer from the peer. The cutover RPC handler treats | ||
| // both Reachable=false and EncryptionCapable=false as hard refusals | ||
| // per the §8 failure-modes table, but the verdict separates them so | ||
| // the operator-facing error message can name the precise reason. | ||
| type CapabilityVerdict struct { | ||
| FullNodeID uint64 | ||
| EncryptionCapable bool | ||
| BuildSHA string | ||
| SidecarPresent bool | ||
| Reachable bool | ||
| Err error | ||
| } | ||
|
|
||
| // CapabilityFanoutResult is the aggregated outcome. OK is true iff | ||
| // every verdict has Reachable && EncryptionCapable — there is no | ||
| // partial-success mode per §4.3. | ||
| // | ||
| // Named CapabilityFanoutResult rather than the design-doc's | ||
| // `FanoutResult` to avoid a collision with the unrelated | ||
| // `admin.FanoutResult` in `keyviz_fanout.go` (KeyViz cluster fan-out | ||
| // shipped earlier in the same package). | ||
| type CapabilityFanoutResult struct { | ||
| Verdicts []CapabilityVerdict | ||
| OK bool | ||
| } | ||
|
|
||
| // DialFunc opens a connection to one node's admin endpoint and | ||
| // returns an EncryptionAdmin client plus a cleanup closure. The | ||
| // helper invokes the closure exactly once per successful dial, | ||
| // regardless of how the RPC subsequently resolved. | ||
| // | ||
| // The 6D design says "DialFunc reuses the existing admin connection | ||
| // pool" — the concrete implementation will reach for whatever | ||
| // connection-pool helper the caller already holds (e.g. the | ||
| // `internal/admin.ForwardClient` connection pool for TLS-aware | ||
| // dials). | ||
| type DialFunc func(ctx context.Context, address string) (pb.EncryptionAdminClient, func(), error) | ||
|
|
||
| // CapabilityFanout fans GetCapability out to every unique | ||
| // (voter ∪ learner) of every group in routes. Concurrent; bounded | ||
| // by timeout regardless of how many members respond. Missing | ||
| // responses surface as Reachable=false verdicts (no partial-success | ||
| // mode — see §4.3). | ||
| // | ||
| // Dedup key: FullNodeID. A node serving multiple groups is probed | ||
| // exactly once. Members with FullNodeID=0 are treated as distinct | ||
| // dedup keys per unique address; this case appears in stub catalogs | ||
| // where the dedup-by-id contract has not been satisfied — the | ||
| // helper still completes by falling back to address-based identity | ||
| // rather than silently collapsing every zero-id row into one probe. | ||
| // | ||
| // Returns (result, nil) on every input. The error slot is reserved | ||
| // for input validation failures (zero-member snapshot, etc.) so | ||
| // callers can keep their existing `err != nil → refuse` shape. | ||
| func CapabilityFanout( | ||
| ctx context.Context, | ||
| routes RouteSnapshot, | ||
| dial DialFunc, | ||
| timeout time.Duration, | ||
| ) (CapabilityFanoutResult, error) { | ||
| if dial == nil { | ||
| return CapabilityFanoutResult{}, pkgerrors.Wrap(errCapabilityFanoutBadInput, "dial func is nil") | ||
| } | ||
| if timeout <= 0 { | ||
| return CapabilityFanoutResult{}, pkgerrors.Wrapf(errCapabilityFanoutBadInput, "timeout must be positive, got %v", timeout) | ||
| } | ||
|
|
||
| dedupKeys := buildCapabilityFanoutDedupSet(routes) | ||
| if len(dedupKeys) == 0 { | ||
| return CapabilityFanoutResult{Verdicts: []CapabilityVerdict{}, OK: false}, nil | ||
| } | ||
|
|
||
| fanCtx, cancel := context.WithTimeout(ctx, timeout) | ||
| defer cancel() | ||
|
|
||
| results := runCapabilityFanoutProbes(fanCtx, dedupKeys, dial) | ||
| return CapabilityFanoutResult{Verdicts: results, OK: capabilityFanoutAllOK(results)}, nil | ||
| } | ||
|
|
||
| // buildCapabilityFanoutDedupSet folds every voter ∪ learner of | ||
| // every group into the dedup map keyed by FullNodeID (or address | ||
| // for unidentified rows; a synthetic key for fully-malformed rows | ||
| // with neither FullNodeID nor Address — see capabilityFanoutDedupKey). | ||
| // Pulled out of CapabilityFanout to keep the orchestration body | ||
| // under the cyclomatic-complexity budget. The size hint is the | ||
| // upper bound on cardinality (every row distinct); the map shrinks | ||
| // at no cost when dedup collapses entries. | ||
| func buildCapabilityFanoutDedupSet(routes RouteSnapshot) map[string]RouteMember { | ||
| upperBound := 0 | ||
| for _, group := range routes.Groups { | ||
| upperBound += len(group.Voters) + len(group.Learners) | ||
| } | ||
| dedupKeys := make(map[string]RouteMember, upperBound) | ||
| for _, group := range routes.Groups { | ||
| for _, m := range group.Voters { | ||
| addCapabilityFanoutDedupTarget(dedupKeys, m) | ||
| } | ||
| for _, m := range group.Learners { | ||
| addCapabilityFanoutDedupTarget(dedupKeys, m) | ||
| } | ||
| } | ||
| return dedupKeys | ||
| } | ||
|
|
||
| // runCapabilityFanoutProbes dials every dedup target concurrently | ||
| // and returns the per-node verdicts in unspecified order. Bounded | ||
| // by ctx — when the deadline fires before a probe goroutine | ||
| // finishes, its pre-seeded "probe-timeout" verdict surfaces as | ||
| // Reachable=false instead of the helper waiting indefinitely for | ||
| // a buggy DialFunc/client that ignores ctx cancellation. This | ||
| // pins the §4.3 "Returns within `timeout`" contract. | ||
| // | ||
| // Note on goroutine cleanup: goroutines whose probes ignore ctx | ||
| // continue running until they unblock on their own. The helper | ||
| // does NOT promise to reclaim those goroutines — the contract is | ||
| // "the HELPER returns within timeout", not "every spawned | ||
| // goroutine returns within timeout". A dialer / client that | ||
| // ignores ctx is a bug on the caller side; the helper's job is to | ||
| // prevent that bug from hanging the admin RPC path. | ||
| func runCapabilityFanoutProbes(ctx context.Context, dedupKeys map[string]RouteMember, dial DialFunc) []CapabilityVerdict { | ||
| slots := make(map[string]*CapabilityVerdict, len(dedupKeys)) | ||
| for key, member := range dedupKeys { | ||
| v := CapabilityVerdict{ | ||
| FullNodeID: member.FullNodeID, | ||
| Err: pkgerrors.Wrap(errCapabilityFanoutProbeTimeout, member.Address), | ||
| } | ||
| slots[key] = &v | ||
| } | ||
|
|
||
| var mu sync.Mutex | ||
| var wg sync.WaitGroup | ||
| for key, m := range dedupKeys { | ||
| wg.Add(1) | ||
| go func(slotKey string, member RouteMember) { | ||
| defer wg.Done() | ||
| verdict := probeCapability(ctx, member, dial) | ||
| mu.Lock() | ||
| *slots[slotKey] = verdict | ||
| mu.Unlock() | ||
| }(key, m) | ||
| } | ||
|
|
||
| done := make(chan struct{}) | ||
| go func() { | ||
| wg.Wait() | ||
| close(done) | ||
| }() | ||
|
|
||
| select { | ||
| case <-done: | ||
| case <-ctx.Done(): | ||
| } | ||
|
|
||
| mu.Lock() | ||
| defer mu.Unlock() | ||
| out := make([]CapabilityVerdict, 0, len(slots)) | ||
| for _, v := range slots { | ||
| out = append(out, *v) | ||
| } | ||
| return out | ||
| } | ||
|
|
||
| func capabilityFanoutAllOK(verdicts []CapabilityVerdict) bool { | ||
| if len(verdicts) == 0 { | ||
| return false | ||
| } | ||
| for _, v := range verdicts { | ||
| if !v.Reachable || !v.EncryptionCapable { | ||
| return false | ||
| } | ||
| } | ||
| return true | ||
| } | ||
|
|
||
| // addCapabilityFanoutDedupTarget folds a member into the dedup map. | ||
| // Fail-closed posture: malformed rows (empty Address) are NOT | ||
| // dropped here — they would silently disappear from Result.Verdicts | ||
| // and let OK=true sneak through despite an unprobed peer. Instead | ||
| // every row is admitted; probeCapability classifies empty-Address | ||
| // members as Reachable=false with errCapabilityFanoutMalformedMember | ||
| // so the cutover RPC's caller-facing error can name the missing | ||
| // address. | ||
| func addCapabilityFanoutDedupTarget(m map[string]RouteMember, member RouteMember) { | ||
| key := capabilityFanoutDedupKey(member, len(m)) | ||
| if _, exists := m[key]; exists { | ||
| return | ||
| } | ||
| m[key] = member | ||
| } | ||
|
|
||
| // capabilityFanoutDedupKey is the dedup key for a route member. | ||
| // | ||
| // - FullNodeID != 0 → "id:<full_node_id>" (the §4.1 dedup contract) | ||
| // - FullNodeID == 0 ∧ Address != "" → "addr:<address>" | ||
| // (stub catalogs where dedup-by-id isn't satisfied still | ||
| // dedupe by address rather than silently collapsing rows) | ||
| // - FullNodeID == 0 ∧ Address == "" → "synthetic:<ordinal>" | ||
| // (a fully-malformed row gets a unique key so two such rows | ||
| // surface as two separate Reachable=false verdicts instead | ||
| // of collapsing into one and hiding the second | ||
| // misconfiguration) | ||
| func capabilityFanoutDedupKey(member RouteMember, ordinal int) string { | ||
| if member.FullNodeID != 0 { | ||
| return "id:" + uint64ToDecimal(member.FullNodeID) | ||
| } | ||
| if member.Address != "" { | ||
| return "addr:" + member.Address | ||
| } | ||
| return "synthetic:" + strconv.Itoa(ordinal) | ||
| } | ||
|
|
||
| // uint64ToDecimal avoids pulling fmt for one-call hot-path | ||
| // stringification used only to build dedup keys. | ||
| func uint64ToDecimal(v uint64) string { | ||
| if v == 0 { | ||
| return "0" | ||
| } | ||
| var buf [20]byte | ||
| i := len(buf) | ||
| for v > 0 { | ||
| i-- | ||
| buf[i] = byte('0' + v%10) | ||
| v /= 10 | ||
| } | ||
| return string(buf[i:]) | ||
| } | ||
|
|
||
| func probeCapability(ctx context.Context, member RouteMember, dial DialFunc) CapabilityVerdict { | ||
| verdict := CapabilityVerdict{FullNodeID: member.FullNodeID} | ||
| if member.Address == "" { | ||
| verdict.Err = pkgerrors.Wrapf(errCapabilityFanoutMalformedMember, "full_node_id=%d has empty address", member.FullNodeID) | ||
| return verdict | ||
| } | ||
| client, closer, err := dial(ctx, member.Address) | ||
| if err != nil { | ||
| verdict.Err = pkgerrors.Wrapf(err, "dial %s", member.Address) | ||
| return verdict | ||
| } | ||
| if closer != nil { | ||
| defer closer() | ||
| } | ||
| if client == nil { | ||
| verdict.Err = pkgerrors.Wrapf(errCapabilityFanoutBadDialer, "dial %s", member.Address) | ||
| return verdict | ||
| } | ||
| report, err := client.GetCapability(ctx, &pb.Empty{}) | ||
| if err != nil { | ||
| verdict.Err = pkgerrors.Wrapf(err, "GetCapability %s", member.Address) | ||
| return verdict | ||
| } | ||
| // Mismatched-responder guard: in a stale-routing or | ||
| // shared-address scenario, the responder might report a | ||
| // different full_node_id than the one the snapshot expected. | ||
| // Accepting the response would credit the expected member as | ||
| // verified despite no member-X ever answering. Fail closed. | ||
| if member.FullNodeID != 0 && report.GetFullNodeId() != 0 && report.GetFullNodeId() != member.FullNodeID { | ||
| verdict.Err = pkgerrors.Wrapf(errCapabilityFanoutMismatchedResponder, | ||
| "%s: expected full_node_id=%d got %d", member.Address, member.FullNodeID, report.GetFullNodeId()) | ||
| return verdict | ||
| } | ||
| verdict.Reachable = true | ||
| verdict.EncryptionCapable = report.GetEncryptionCapable() | ||
| verdict.BuildSHA = report.GetBuildSha() | ||
| verdict.SidecarPresent = report.GetSidecarPresent() | ||
| if report.GetFullNodeId() != 0 { | ||
| verdict.FullNodeID = report.GetFullNodeId() | ||
|
Comment on lines
+348
to
+349
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The probe accepts any successful Useful? React with 👍 / 👎. |
||
| } | ||
| return verdict | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If
dialreturns(nil, closer, nil), this line panics onclient.GetCapability(...)and can crash the admin RPC path instead of producing aReachable=falseverdict. BecauseDialFuncis injected and not validated beyonderr, the helper should fail closed whenclient == nil(treating it like a dial failure) so a bad dialer implementation cannot take down cutover preflight.Useful? React with 👍 / 👎.