feat(bigtable): add ClientConfigurationManager + EnableSession config#19986
feat(bigtable): add ClientConfigurationManager + EnableSession config#19986sushanb wants to merge 5 commits into
Conversation
Introduce the ClientConfigurationManager, which polls the service via GetClientConfiguration on a fixed interval and fans the parsed config out to registered listeners. Includes the manager (Start / Close / listener registration / RPC retry / validity-window fallback) and a test suite covering construction, polling, listener fan-out, retry on transient errors, and Close shutdown semantics. Wire it into Client behind a new ClientConfig.EnableSession flag (off by default). When set, NewClientWithConfig constructs the manager using the data-plane BigtableClient and the instance-scoped request metadata, then starts it; Close stops the manager first so it cannot fire listener callbacks against state that is about to be torn down. Future session-pool PRs will key off the configuration surface this manager exposes. For now it only polls. Part of the series of small PRs replacing googleapis#19980.
There was a problem hiding this comment.
Code Review
This pull request introduces a ClientConfigurationManager to dynamically poll and manage client configurations from the Bigtable control plane. The review feedback identifies several critical issues: the background polling loop is bound to a potentially short-lived context in Start which could prematurely terminate polling; a race condition in addListener can allow stale configurations to overwrite newer ones; an integer overflow in the exponential backoff calculation (1<<i) can cause a runtime panic in rand.Intn; and the fallback mechanism fails to reset the validity window (m.validUntil), resulting in redundant fallback notifications during prolonged outages.
| func (m *ClientConfigurationManager) Start(ctx context.Context) { | ||
| btopt.Debugf(m.logger, "bigtable: starting client configuration manager for instance %q, app profile %q", m.instanceName, m.appProfileId) | ||
| // We need a context for the initial poll. | ||
| m.pollsWG.Add(1) | ||
| go func() { | ||
| defer m.pollsWG.Done() | ||
| pollCtx, cancel := context.WithTimeout(ctx, 5*time.Second) | ||
| defer cancel() | ||
| m.poll(pollCtx) | ||
| }() | ||
|
|
||
| // Start background polling | ||
| go m.pollingLoop(ctx) | ||
| } | ||
|
|
There was a problem hiding this comment.
The background polling loop and the initial poll are currently bound to the lifecycle of the context passed to Start(ctx). In typical usage, the context passed to client initialization (NewClientWithConfig) is short-lived and canceled shortly after client creation. If this context is canceled, the background polling loop will terminate immediately, and the initial poll may be cut short.
To ensure the configuration manager continues to poll for the lifetime of the client, the background polling loop should be run with context.Background(). Similarly, the initial poll should use context.Background() with a timeout to avoid premature cancellation.
| func (m *ClientConfigurationManager) Start(ctx context.Context) { | |
| btopt.Debugf(m.logger, "bigtable: starting client configuration manager for instance %q, app profile %q", m.instanceName, m.appProfileId) | |
| // We need a context for the initial poll. | |
| m.pollsWG.Add(1) | |
| go func() { | |
| defer m.pollsWG.Done() | |
| pollCtx, cancel := context.WithTimeout(ctx, 5*time.Second) | |
| defer cancel() | |
| m.poll(pollCtx) | |
| }() | |
| // Start background polling | |
| go m.pollingLoop(ctx) | |
| } | |
| // Start begins the polling process. | |
| func (m *ClientConfigurationManager) Start(ctx context.Context) { | |
| btopt.Debugf(m.logger, "bigtable: starting client configuration manager for instance %q, app profile %q", m.instanceName, m.appProfileId) | |
| // We need a context for the initial poll. | |
| m.pollsWG.Add(1) | |
| go func() { | |
| defer m.pollsWG.Done() | |
| pollCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | |
| defer cancel() | |
| m.poll(pollCtx) | |
| }() | |
| // Start background polling | |
| go m.pollingLoop(context.Background()) | |
| } |
There was a problem hiding this comment.
wrong. ctx is from Newclient so it is long lived.
| func (m *ClientConfigurationManager) addListener(listener configListener) func() { | ||
| m.mu.Lock() | ||
| id := m.nextListenerID | ||
| m.nextListenerID++ | ||
| if m.listeners == nil { | ||
| m.listeners = make(map[int]configListener) | ||
| } | ||
| m.listeners[id] = listener | ||
|
|
||
| cfg := m.currentConfig.Clone() | ||
| seq := m.configSeq | ||
| m.mu.Unlock() | ||
|
|
||
| btopt.Debugf(m.logger, "bigtable: adding configuration listener (id: %d)", id) | ||
| listener(cfg, seq) | ||
|
|
||
| return func() { | ||
| m.mu.Lock() | ||
| delete(m.listeners, id) | ||
| m.mu.Unlock() | ||
| } | ||
| } |
There was a problem hiding this comment.
There is a subtle race condition in addListener where a concurrent poll can complete and notify the newly added listener with a newer configuration sequence number before the initial notification listener(cfg, seq) is executed. This results in the listener receiving the older configuration sequence after the newer one, causing it to silently overwrite the newer configuration with the stale one.
While the test TestManagerNotifyListeners_Race implements a sequence check to handle this, the actual production listeners registered via AddSessionPoolListener and AddSessionLoadListener do not have any sequence checks and are vulnerable to this race.
To prevent this, we can wrap the listener in addListener to enforce monotonic sequence numbers using atomic.Int64.
func (m *ClientConfigurationManager) addListener(listener configListener) func() {
m.mu.Lock()
id := m.nextListenerID
m.nextListenerID++
var lastSeq atomic.Int64
lastSeq.Store(-1)
wrappedListener := func(cfg clientConfig, seq int64) {
for {
current := lastSeq.Load()
if seq <= current {
return
}
if lastSeq.CompareAndSwap(current, seq) {
listener(cfg, seq)
return
}
}
}
if m.listeners == nil {
m.listeners = make(map[int]configListener)
}
m.listeners[id] = wrappedListener
cfg := m.currentConfig.Clone()
seq := m.configSeq
m.mu.Unlock()
btopt.Debugf(m.logger, "bigtable: adding configuration listener (id: %d)", id)
wrappedListener(cfg, seq)
return func() {
m.mu.Lock()
delete(m.listeners, id)
m.mu.Unlock()
}
}There was a problem hiding this comment.
Fixed in 22f4f8c. The register-then-fire window was indeed racy — wrapped every registered listener in a small monotonicListener guard backed by atomic.Int64 so out-of-order older deliveries are dropped. Listener implementations no longer need their own seq check.
| if i < maxRetries { | ||
| delay := time.Duration(rand.Intn(1<<i)) * time.Second | ||
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
| case <-time.After(delay): | ||
| } | ||
| } |
There was a problem hiding this comment.
If maxRetries is configured to a large value (or if the server returns an extreme value), 1<<i can overflow the signed integer type, resulting in a negative value or zero. Passing a non-positive value to rand.Intn will cause a runtime panic, crashing the client.
Additionally, without a cap, the backoff delay can grow excessively large (e.g., hours or days). Capping the backoff limit at a reasonable maximum (e.g., 30 seconds) prevents both integer overflow panics and excessively long retry delays.
| if i < maxRetries { | |
| delay := time.Duration(rand.Intn(1<<i)) * time.Second | |
| select { | |
| case <-ctx.Done(): | |
| return | |
| case <-time.After(delay): | |
| } | |
| } | |
| if i < maxRetries { | |
| backoffLimit := 1 << i | |
| if backoffLimit > 30 { | |
| backoffLimit = 30 | |
| } | |
| delay := time.Duration(rand.Intn(backoffLimit)) * time.Second | |
| select { | |
| case <-ctx.Done(): | |
| return | |
| case <-time.After(delay): | |
| } | |
| } |
There was a problem hiding this comment.
Fixed in 22f4f8c. Capped at maxBackoffSeconds = 30 and also treat backoffLimit <= 0 (the overflow case) the same as the cap so rand.Intn can't panic.
| // Fall back to default configuration if validity window has expired | ||
| if time.Now().After(m.validUntil) { | ||
| btopt.Debugf(m.logger, "bigtable: client configuration validity window expired, falling back to default config") | ||
| m.currentConfig = m.defaultConfig | ||
| m.configSeq++ | ||
| seq = m.configSeq | ||
| listeners = make([]configListener, 0, len(m.listeners)) | ||
| for _, l := range m.listeners { | ||
| listeners = append(listeners, l) | ||
| } | ||
| cfgToNotify = m.currentConfig | ||
| } |
There was a problem hiding this comment.
When the client configuration validity window expires and the poll fails, the manager falls back to the default configuration. However, because m.validUntil is not reset or updated during this fallback, every subsequent failed poll (which occurs on every polling interval) will repeatedly trigger the fallback block. This results in redundant sequence number increments and unnecessary listener notifications with the same default configuration during a prolonged outage.
Resetting m.validUntil to a far-future time (similar to initialization) upon fallback prevents these redundant notifications.
| // Fall back to default configuration if validity window has expired | |
| if time.Now().After(m.validUntil) { | |
| btopt.Debugf(m.logger, "bigtable: client configuration validity window expired, falling back to default config") | |
| m.currentConfig = m.defaultConfig | |
| m.configSeq++ | |
| seq = m.configSeq | |
| listeners = make([]configListener, 0, len(m.listeners)) | |
| for _, l := range m.listeners { | |
| listeners = append(listeners, l) | |
| } | |
| cfgToNotify = m.currentConfig | |
| } | |
| // Fall back to default configuration if validity window has expired | |
| if time.Now().After(m.validUntil) { | |
| btopt.Debugf(m.logger, "bigtable: client configuration validity window expired, falling back to default config") | |
| m.currentConfig = m.defaultConfig | |
| m.configSeq++ | |
| seq = m.configSeq | |
| m.validUntil = time.Now().Add(time.Hour * 24 * 365 * 100) | |
| listeners = make([]configListener, 0, len(m.listeners)) | |
| for _, l := range m.listeners { | |
| listeners = append(listeners, l) | |
| } | |
| cfgToNotify = m.currentConfig | |
| } |
There was a problem hiding this comment.
Fixed in 22f4f8c. Reset m.validUntil to the same far-future sentinel used at construction, so the fallback branch fires exactly once per outage and won't keep bumping configSeq on every interval thereafter.
bigtable.Row is map[string][]ReadItem, so the previous range loop emitted Families in randomized iteration order. The conformance suite compares responses with protocmp.Transform, which treats repeated-field order as significant — so any row with two or more families had roughly a 50% chance of producing a families:[B, A] vs families:[A, B] diff and failing. TestReadRow_NoRetry_CommitInSeparateChunk has been hitting this since the proxy landed in 2022, was reported as googleapis#12621, and got silently closed via an unrelated cross-reference without a code fix. Most recently it flaked on the unrelated PR googleapis#20010 (parallel-prime). Sort family names before iterating so the emitted families match the real server's lex order and the test fixtures. (cherry picked from commit 135e672)
… in ClientConfigurationManager Lint (golint, was failing CI): - Add doc comment on exported const MinPollingInterval. - Add doc-block comment on the loadBalancingStrategy iota block (covers StrategyLeastInFlight / StrategyRandom / StrategyPeakEwma). - Rename pollingConfig.MaxRpcRetryCount -> MaxRPCRetryCount (initialism). - Rename appProfileId field + constructor parameter -> appProfileID (initialism). addListener race (gemini review): - The register-then-fire window in addListener was racy: a concurrent poll() could fan out seq=N+1 to the newly stored listener BEFORE the registration-time fire (carrying seq=N) ran, leaving the listener stuck on the stale config. Wrap each registered listener in a monotonicListener guard backed by an atomic int64; out-of-order older deliveries are dropped. Backoff overflow / unbounded delay (gemini review): - A large MaxRPCRetryCount (whether configured or server-supplied) made 1<<i overflow to <=0, which then panicked rand.Intn. Even without overflow the per-attempt wait could stretch into hours. Cap the shift at maxBackoffSeconds (30s) and treat any <=0 value the same as the cap. Outage storm (gemini review): - After the validity window expired and the poll kept failing, every subsequent failed poll re-entered the fallback branch (validUntil was never refreshed), bumping configSeq and re-notifying listeners with the same default configuration on every interval. Reset validUntil to the same far-future sentinel used at construction so the fallback fires exactly once per real outage.
…nfiguration Adds the session-pool counterpart to pingAndWarmDirectAccessChecker so a future session-pool factory (vprc-integration) can decide Direct Access compatibility from the server-pushed ClientConfiguration response instead of every client probing with PingAndWarm at startup. Plumbing: - channelPoolConfig grows a typed Mode field mirroring the proto's ChannelPoolConfiguration.Mode oneof: modeDirectAccessWithFallback (the server default, matches today's behavior), modeDirectAccessOnly, modeCloudPathOnly. parseChannelPoolConfig now switches on the oneof so the server's decision survives into the manager's currentConfig. - New clientConfigDirectAccessChecker implements DirectAccessChecker by reading channelPoolConfig.Mode from a ClientConfigurationManager. CloudPathOnly short-circuits without dialing; the other two modes dial via the direct-access dialer and adopt the connection. Emits the same direct_access/compatible metric shape as the PingAndWarm checker so observability stays uniform. - channel_pool_factory.go gets a sibling entry point CreateSessionChannelPool that wires the manager-driven checker. The shared pool-option scaffolding (resolveConnPoolSize, basePoolOpts, standardDialer, buildDirectAccessDialer) is extracted from CreateBigtableChannelPool so both factories stay in lockstep on the non-checker parts of pool construction. Tests cover modeDirectAccessWithFallback / modeDirectAccessOnly / modeCloudPathOnly (dialer must not be called on CloudPathOnly), dial-failure reporting, and a compile-time guard that clientConfigDirectAccessChecker satisfies DirectAccessChecker so WithDirectAccessChecker accepts it. Plays well with vprc-integration: identifiers don't collide with anything on that branch, and a session-pool wiring there can call CreateSessionChannelPool(ctx, project, instance, config, manager, ...) in place of CreateBigtableChannelPool to get a manager-driven Direct Access decision.
Summary
ClientConfigurationManagerinbigtable/internal/transport: pollsGetClientConfigurationon a fixed interval, parses the response into a typedclientConfig, and fans changes out to registered listeners. Supports RPC retry, validity-window fallback, andClose()shutdown that waits for in-flight polls.Closesemantics.Clientbehind a newClientConfig.EnableSessionflag (off by default). When set,NewClientWithConfigconstructs the manager using the data-planeBigtableClientand the instance-scoped request metadata, then starts it;Closestops the manager first so it cannot fire listener callbacks against state that is about to be torn down.Future session-pool PRs will key off the configuration surface this manager exposes. For now it only polls.
Part of the series of small PRs replacing #19980.
Test plan
go build ./...inbigtable/go vet ./...inbigtable/go test -short ./internal/transport/... .inbigtable/— all pass