Skip to content

Commit 9fe075a

Browse files
Add optin setting to await MinPoolSize population
1 parent 4098fda commit 9fe075a

File tree

2 files changed

+100
-37
lines changed

2 files changed

+100
-37
lines changed

internal/integration/unified/client_entity.go

Lines changed: 92 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"go.mongodb.org/mongo-driver/v2/mongo/options"
2525
"go.mongodb.org/mongo-driver/v2/mongo/readconcern"
2626
"go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore"
27+
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/description"
2728
)
2829

2930
// There are no automated tests for truncation. Given that, setting the
@@ -59,6 +60,8 @@ type clientEntity struct {
5960
ignoredCommands map[string]struct{}
6061
observeSensitiveCommands *bool
6162
numConnsCheckedOut int32
63+
latestDesc event.TopologyDescription
64+
connsPerServer map[string]int
6265

6366
// These should not be changed after the clientEntity is initialized
6467
observedEvents map[monitoringEventType]struct{}
@@ -75,29 +78,6 @@ type clientEntity struct {
7578
logQueue chan orderedLogMessage
7679
}
7780

78-
// awaitMinimumPoolSize waits for the client's connection pool to reach the
79-
// specified minimum size. This is a best effort operation that times out after
80-
// some predefined amount of time to avoid blocking tests indefinitely.
81-
func awaitMinimumPoolSize(ctx context.Context, entity *clientEntity, minPoolSize uint64) error {
82-
// Don't spend longer than 500ms awaiting minPoolSize.
83-
awaitCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
84-
defer cancel()
85-
86-
ticker := time.NewTicker(100 * time.Millisecond)
87-
defer ticker.Stop()
88-
89-
for {
90-
select {
91-
case <-awaitCtx.Done():
92-
return fmt.Errorf("timed out waiting for client to reach minPoolSize")
93-
case <-ticker.C:
94-
if uint64(entity.eventsCount[connectionReadyEvent]) >= minPoolSize {
95-
return nil
96-
}
97-
}
98-
}
99-
}
100-
10181
func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOptions) (*clientEntity, error) {
10282
// The "configureFailPoint" command should always be ignored.
10383
ignoredCommands := map[string]struct{}{
@@ -118,6 +98,7 @@ func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOp
11898
serverDescriptionChangedEventsCount: make(map[serverDescriptionChangedEventInfo]int32),
11999
entityMap: em,
120100
observeSensitiveCommands: entityOptions.ObserveSensitiveCommands,
101+
connsPerServer: make(map[string]int),
121102
}
122103
entity.setRecordEvents(true)
123104

@@ -226,8 +207,15 @@ func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOp
226207
return nil, fmt.Errorf("error creating mongo.Client: %w", err)
227208
}
228209

229-
if entityOptions.AwaitMinPoolSize && clientOpts.MinPoolSize != nil && *clientOpts.MinPoolSize > 0 {
230-
if err := awaitMinimumPoolSize(ctx, entity, *clientOpts.MinPoolSize); err != nil {
210+
if entityOptions.AwaitMinPoolSizeMS != nil && *entityOptions.AwaitMinPoolSizeMS > 0 &&
211+
clientOpts.MinPoolSize != nil && *clientOpts.MinPoolSize > 0 {
212+
213+
if err := func() error {
214+
awaitCtx, cancel := context.WithTimeout(ctx, time.Duration(*entityOptions.AwaitMinPoolSizeMS)*time.Millisecond)
215+
defer cancel()
216+
217+
return awaitMinimumPoolSize(awaitCtx, entity, *clientOpts.MinPoolSize)
218+
}(); err != nil {
231219
return nil, err
232220
}
233221
}
@@ -476,6 +464,27 @@ func (c *clientEntity) processFailedEvent(_ context.Context, evt *event.CommandF
476464
}
477465
}
478466

467+
func (c *clientEntity) resetEventHistory() {
468+
c.eventProcessMu.Lock()
469+
defer c.eventProcessMu.Unlock()
470+
471+
c.pooled = nil
472+
c.serverDescriptionChanged = nil
473+
c.serverHeartbeatStartedEvent = nil
474+
c.serverHeartbeatSucceeded = nil
475+
c.serverHeartbeatFailedEvent = nil
476+
c.topologyDescriptionChanged = nil
477+
c.topologyOpening = nil
478+
c.topologyClosed = nil
479+
}
480+
481+
func (c *clientEntity) latestTopology() event.TopologyDescription {
482+
c.eventProcessMu.RLock()
483+
defer c.eventProcessMu.RUnlock()
484+
485+
return c.latestDesc
486+
}
487+
479488
func getPoolEventDocument(evt *event.PoolEvent, eventType monitoringEventType) bson.Raw {
480489
bsonBuilder := bsoncore.NewDocumentBuilder().
481490
AppendString("name", string(eventType)).
@@ -506,12 +515,21 @@ func (c *clientEntity) processPoolEvent(evt *event.PoolEvent) {
506515
return
507516
}
508517

509-
// Update the connection counter. This happens even if we're not storing any events.
518+
// Update the connection counter. This happens even if we're not storing any
519+
// events.
510520
switch evt.Type {
511521
case event.ConnectionCheckedOut:
512522
atomic.AddInt32(&c.numConnsCheckedOut, 1)
513523
case event.ConnectionCheckedIn:
514524
atomic.AddInt32(&c.numConnsCheckedOut, -1)
525+
case event.ConnectionReady:
526+
c.eventProcessMu.Lock()
527+
c.connsPerServer[evt.Address]++
528+
c.eventProcessMu.Unlock()
529+
case event.ConnectionClosed:
530+
c.eventProcessMu.Lock()
531+
c.connsPerServer[evt.Address]--
532+
c.eventProcessMu.Unlock()
515533
}
516534

517535
eventType := monitoringEventTypeFromPoolEvent(evt)
@@ -529,6 +547,20 @@ func (c *clientEntity) processPoolEvent(evt *event.PoolEvent) {
529547
}
530548
}
531549

550+
// connsReady returns the number of ready connections for the given server
551+
// address. If the server is not data-bearing, this method will return -1.
552+
func (c *clientEntity) connsReady(server event.ServerDescription) int {
553+
c.eventProcessMu.RLock()
554+
defer c.eventProcessMu.RUnlock()
555+
556+
if server.Kind == description.ServerKindRSArbiter.String() ||
557+
server.Kind == description.ServerKindRSGhost.String() {
558+
return -1
559+
}
560+
561+
return c.connsPerServer[server.Addr.String()]
562+
}
563+
532564
func (c *clientEntity) processServerDescriptionChangedEvent(evt *event.ServerDescriptionChangedEvent) {
533565
c.eventProcessMu.Lock()
534566
defer c.eventProcessMu.Unlock()
@@ -601,6 +633,8 @@ func (c *clientEntity) processTopologyDescriptionChangedEvent(evt *event.Topolog
601633
return
602634
}
603635

636+
c.latestDesc = evt.NewDescription
637+
604638
if _, ok := c.observedEvents[topologyDescriptionChangedEvent]; ok {
605639
c.topologyDescriptionChanged = append(c.topologyDescriptionChanged, evt)
606640
}
@@ -724,3 +758,35 @@ func evaluateUseMultipleMongoses(clientOpts *options.ClientOptions, useMultipleM
724758
}
725759
return nil
726760
}
761+
762+
// awaitMinimumPoolSize waits for the client's connection pool to reach the
763+
// specified minimum size. This is a best effort operation that times out after
764+
// some predefined amount of time to avoid blocking tests indefinitely.
765+
func awaitMinimumPoolSize(ctx context.Context, entity *clientEntity, minPoolSize uint64) error {
766+
ticker := time.NewTicker(100 * time.Millisecond)
767+
defer ticker.Stop()
768+
769+
for {
770+
select {
771+
case <-ctx.Done():
772+
return fmt.Errorf("timed out waiting for client to reach minPoolSize")
773+
case <-ticker.C:
774+
ready := true
775+
for _, server := range entity.latestTopology().Servers {
776+
if r := entity.connsReady(server); r > 0 && r < int(minPoolSize) {
777+
ready = false
778+
779+
// If any server has less than minPoolSize connections, continue
780+
// waiting.
781+
break
782+
}
783+
}
784+
785+
if ready {
786+
entity.resetEventHistory()
787+
788+
return nil
789+
}
790+
}
791+
}
792+
}

internal/integration/unified/entity.go

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,8 @@ import (
2323
"go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore"
2424
)
2525

26-
var (
27-
// ErrEntityMapOpen is returned when a slice entity is accessed while the EntityMap is open
28-
ErrEntityMapOpen = errors.New("slices cannot be accessed while EntityMap is open")
29-
)
26+
// ErrEntityMapOpen is returned when a slice entity is accessed while the EntityMap is open
27+
var ErrEntityMapOpen = errors.New("slices cannot be accessed while EntityMap is open")
3028

3129
var (
3230
tlsCAFile = os.Getenv("CSFLE_TLS_CA_FILE")
@@ -83,11 +81,10 @@ type entityOptions struct {
8381

8482
ClientEncryptionOpts *clientEncryptionOpts `bson:"clientEncryptionOpts"`
8583

86-
// If true, the unified spec runner must wait for the connection pool to be
87-
// populated for all servers according to the minPoolSize option. If false,
88-
// not specified, or if minPoolSize equals 0, there is no need to wait for any
89-
// specific pool state.
90-
AwaitMinPoolSize bool `bson:"awaitMinPoolSize"`
84+
// Maximum duration (in milliseconds) that the test runner MUST wait for each
85+
// connection pool to be populated with minPoolSize. Any CMAP and SDAM events
86+
// that occur before the pool is populated will be ignored.
87+
AwaitMinPoolSizeMS *int `bson:"awaitMinPoolSizeMS"`
9188
}
9289

9390
func (eo *entityOptions) setHeartbeatFrequencyMS(freq time.Duration) {
@@ -106,7 +103,8 @@ func (eo *entityOptions) setHeartbeatFrequencyMS(freq time.Duration) {
106103
// newCollectionEntityOptions constructs an entity options object for a
107104
// collection.
108105
func newCollectionEntityOptions(id string, databaseID string, collectionName string,
109-
opts *dbOrCollectionOptions) *entityOptions {
106+
opts *dbOrCollectionOptions,
107+
) *entityOptions {
110108
options := &entityOptions{
111109
ID: id,
112110
DatabaseID: databaseID,
@@ -598,7 +596,6 @@ func getKmsCredential(kmsDocument bson.Raw, credentialName string, envVar string
598596
return "", fmt.Errorf("unable to get environment value for %v. Please set the CSFLE environment variable: %v", credentialName, envVar)
599597
}
600598
return os.Getenv(envVar), nil
601-
602599
}
603600

604601
func (em *EntityMap) addClientEncryptionEntity(entityOptions *entityOptions) error {

0 commit comments

Comments
 (0)