-
Notifications
You must be signed in to change notification settings - Fork 1.5k
feat(internal): implement Bigtable specific channel pool optimizations #13226
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
sushanb
commented
Oct 24, 2025
- The hot path (selecting a connection for an RPC) is highly optimized for performance. as The list of connections is stored in an atomic.Value, and load counters are managed with atomic operations
- The pool automatically detects and evicts the single worst-performing unhealthy connection at a regular interval.
- If the percentage of unhealthy connections exceeds a high-water mark (PoolwideBadThreshPercent), all evictions are suspended to avoid overwhelming the system during a wider service degradation.
- Evictions are rate-limited by a minimum interval (MinEvictionInterval) to ensure stability.
- A ChannelHealthMonitor runs in the background, periodically probing each connection in the pool.
- Probes are performed by sending a PingAndWarm RPC to the Bigtable backend, verifying end-to-end connectivity.
- Connection health is evaluated based on the percentage of failed probes over a sliding time window (WindowDuration). A connection is marked unhealthy if its failure rate exceeds a configurable threshold (FailurePercentThresh).
Summary of ChangesHello @sushanb, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a comprehensive health checking and self-healing mechanism for the Bigtable gRPC connection pool. The primary goal is to enhance the reliability and performance of the client by proactively identifying and replacing unhealthy connections. This ensures that requests are consistently routed to functional channels, improving overall service stability and responsiveness. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a sophisticated health checking mechanism for the gRPC connection pool, which is a great feature for improving the client's resilience. The implementation includes periodic probing of connections using PingAndWarm RPCs, tracking health over a sliding window, and evicting unhealthy connections. My review focuses on a few areas for improvement, including addressing TODOs for performance and correctness (like parallel dialing and graceful connection draining), and restoring test coverage that was reduced during refactoring. Addressing these points will make the implementation more robust and reliable.
|
FYI. i have a TODO to make DynamicChannelScaleup, MetricsExporter, HealthChecker all part of BackgroundProcess interface. |
|
Few high-level comments from the first pass:
|
I added a drainingState atomic.Bool in connEntry.
replaceConnection takes the actual conn pointer rather than index which avoids this bug. Thanks for finding the bug.
Will refactor. |
nimf
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments from the second pass.
| func TestSelectLeastLoadedRandomOfTwo(t *testing.T) { | ||
| pool := &BigtableChannelPool{} | ||
|
|
||
| // Test empty pool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No action needed. But for the next time:
This test file's diff on GitHub is pretty hard to review, because it shows as if functions were replaced but they are not. I got a much saner diff locally using --diff-algorithm=patience but that's extra moves/efforts.
Maybe better to keep comments like this, because then the diff algorithm would have additional "anchors" to produce more readable diff.
| t.Fatalf("RecvMsg failed: %v", err) | ||
| } | ||
| if string(res.GetPayload().GetBody()) != "msg1" { | ||
| t.Errorf("RecvMsg got %q, want %q", string(res.GetPayload().GetBody()), "msg1") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we removing this check?
| t.Errorf("Pool size got %d, want %d", pool.Num(), poolSize) | ||
| } | ||
| // Wait for priming goroutines to likely complete | ||
| time.Sleep(100 * time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no action needed, just some thoughts for future improvements. Here and everywhere in the tests where we use time.Sleep or wait for some goroutine to do its job -- such tests are usually prone to flakiness, so we should consider starting using testing/synctest package.
| // Wait for priming goroutines to likely complete | ||
| time.Sleep(100 * time.Millisecond) | ||
|
|
||
| if fake.getPingCount() < 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if fake.getPingCount() < 1 { | |
| if fake.getPingCount() < 5 { |
| @@ -0,0 +1,216 @@ | |||
| // Copyright 2025 Google LLC | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the filename is not ending with _test.go this will be included in build, right? I suppose we don't want that.
|
|
||
| targetLoad := (dsm.config.AvgLoadLowThreshold + dsm.config.AvgLoadHighThreshold) / 2 | ||
| if targetLoad == 0 { | ||
| targetLoad = 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will roughly create new channel for every concurrent RPC. Should we have a more reasonable default here? targetLoad of 0 could be only when both AvgLoadLowThreshold and AvgLoadHighThreshold are zero which seems like a misconfiguration.
|
|
||
| dynamicMonitor, ok := findMonitor[*DynamicScaleMonitor](pool) | ||
| if !ok { | ||
| t.Fatal("Could not find ChannelHealthMonitor in pool") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| t.Fatal("Could not find ChannelHealthMonitor in pool") | |
| t.Fatal("Could not find DynamicScaleMonitor in pool") |
| // 1. Simulate recent scaling | ||
| dynamicMonitor, ok := findMonitor[*DynamicScaleMonitor](pool) | ||
| if !ok { | ||
| t.Fatal("Could not find ChannelHealthMonitor in pool") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| t.Fatal("Could not find ChannelHealthMonitor in pool") | |
| t.Fatal("Could not find DynamicScaleMonitor in pool") |
| stopOnce sync.Once // Add sync.Once | ||
| evictionMu sync.Mutex // Guards lastEvictionTime | ||
| lastEvictionTime time.Time | ||
| evictionDone chan struct{} // Notification for test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still use this?
| meterProvider metric.MeterProvider | ||
| // OpenTelemetry metric instruments | ||
| outstandingRPCsHistogram metric.Float64Histogram | ||
| perConnectionErrorCountHistogram metric.Float64Histogram |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RPCs and errors are integers. Why do we need this histograms as floats?
| type BigtableChannelPool struct { | ||
| conns []*grpc.ClientConn | ||
| load []int64 // Tracks active requests per connection | ||
| conns atomic.Value // Stores []*connEntry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, can this be atomic.Pointer[[]*connEntry] ? Then we won't need to typecast.
mutianf
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only looked at connpool.go and had some questions.
| config.Logger, | ||
| nil, | ||
| btransport.WithHealthCheckConfig(btopt.DefaultHealthCheckConfig()), | ||
| btransport.WithDynamicChannelPool(btopt.DefaultDynamicChannelPoolConfig(defaultBigtableConnPoolSize)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: is it possible to pass in default channel pool size in one place?
| } | ||
|
|
||
| // DynamicChannelPoolConfig holds the parameters for dynamic channel pool scaling. | ||
| type DynamicChannelPoolConfig struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we exposing any of these for customers to tune?
| return HealthCheckConfig{ | ||
| Enabled: true, | ||
| ProbeInterval: 30 * time.Second, | ||
| ProbeTimeout: 1 * time.Second, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 second seems long. lets make this consistent with Java? the probe deadline is 500 ms
| MinProbesForEval: 4, | ||
| FailurePercentThresh: 60, | ||
| PoolwideBadThreshPercent: 70, | ||
| MinEvictionInterval: 1 * time.Minute, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems short, maybe lets make it consistent with java as well which is 10 minutes ?
| defer cancel() | ||
|
|
||
| var p peer.Peer | ||
| _, err := client.PingAndWarm(primeCtx, req, grpc.Peer(&p)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just double checking, this will set the x-goog-request-params header right?
| initialConns := make([]*connEntry, connPoolSize) | ||
| for i := 0; i < connPoolSize; i++ { | ||
| select { | ||
| case <-pool.poolCtx.Done(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when will poolctx be done?
| } | ||
|
|
||
| if worstEntry != nil { | ||
| recordEviction() // Record eviction time *before* replacing. // Record eviction time *before* replacing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| recordEviction() // Record eviction time *before* replacing. // Record eviction time *before* replacing. | |
| recordEviction() // Record eviction time *before* replacing. |
| if idx == -1 { | ||
| btopt.Debugf(p.logger, "bigtable_connpool: Connection to replace was already removed. Draining it.") | ||
| // thread safe to call waitForDrainAndClose as conn.Close() can be called multiple times. | ||
| go p.waitForDrainAndClose(oldEntry) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will this ever happen? wouldn't oldEntry.markAsDraining() means it should already be drained by smoething else?
| btopt.Debugf(p.logger, "bigtable_connpool: Replacing connection at index %d\n", idx) | ||
|
|
||
| // Start the graceful shutdown process for the old connection | ||
| go p.waitForDrainAndClose(oldEntry) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is already done on line 589?
| defer cancel() | ||
| isALTS, err := e.conn.Prime(primeCtx) | ||
| if err != nil { | ||
| btopt.Debugf(p.logger, "bigtable_connpool: failed to prime new connection: %v\n", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the prime request failed does the connection get added to the pool?
| var latenciesBoundaries = []float64{0.0, 0.001, 0.002, 0.003, 0.004, 0.005, 0.006, 0.008, 0.01, 0.013, 0.016, 0.02, 0.025, 0.03, 0.04, 0.05, 0.065, 0.08, 0.1, 0.13, 0.16, 0.2, 0.25, 0.3, 0.4, 0.5, 0.65, 0.8, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 400.0, 800.0, 1600.0, 3200.0} // max is 53.3 minutes | ||
|
|
||
| // Boundaries for the connection_pool.outstanding_rpcs histogram. | ||
| var outstandingRPCsBoundaries = []float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 14, 16, 20, 24, 28, 32, 40, 50, 64, 128, 256, 512, 1024} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this the same as the java metric? The buckets should be consistent.