Skip to content

Commit e9a03c9

Browse files
Add optin setting to await MinPoolSize population
1 parent 4098fda commit e9a03c9

File tree

2 files changed

+84
-37
lines changed

2 files changed

+84
-37
lines changed

internal/integration/unified/client_entity.go

Lines changed: 76 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ type clientEntity struct {
5959
ignoredCommands map[string]struct{}
6060
observeSensitiveCommands *bool
6161
numConnsCheckedOut int32
62+
latestTopology event.TopologyDescription
63+
connsPerServer map[string]int
6264

6365
// These should not be changed after the clientEntity is initialized
6466
observedEvents map[monitoringEventType]struct{}
@@ -75,29 +77,6 @@ type clientEntity struct {
7577
logQueue chan orderedLogMessage
7678
}
7779

78-
// awaitMinimumPoolSize waits for the client's connection pool to reach the
79-
// specified minimum size. This is a best effort operation that times out after
80-
// some predefined amount of time to avoid blocking tests indefinitely.
81-
func awaitMinimumPoolSize(ctx context.Context, entity *clientEntity, minPoolSize uint64) error {
82-
// Don't spend longer than 500ms awaiting minPoolSize.
83-
awaitCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
84-
defer cancel()
85-
86-
ticker := time.NewTicker(100 * time.Millisecond)
87-
defer ticker.Stop()
88-
89-
for {
90-
select {
91-
case <-awaitCtx.Done():
92-
return fmt.Errorf("timed out waiting for client to reach minPoolSize")
93-
case <-ticker.C:
94-
if uint64(entity.eventsCount[connectionReadyEvent]) >= minPoolSize {
95-
return nil
96-
}
97-
}
98-
}
99-
}
100-
10180
func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOptions) (*clientEntity, error) {
10281
// The "configureFailPoint" command should always be ignored.
10382
ignoredCommands := map[string]struct{}{
@@ -118,6 +97,7 @@ func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOp
11897
serverDescriptionChangedEventsCount: make(map[serverDescriptionChangedEventInfo]int32),
11998
entityMap: em,
12099
observeSensitiveCommands: entityOptions.ObserveSensitiveCommands,
100+
connsPerServer: make(map[string]int),
121101
}
122102
entity.setRecordEvents(true)
123103

@@ -226,8 +206,15 @@ func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOp
226206
return nil, fmt.Errorf("error creating mongo.Client: %w", err)
227207
}
228208

229-
if entityOptions.AwaitMinPoolSize && clientOpts.MinPoolSize != nil && *clientOpts.MinPoolSize > 0 {
230-
if err := awaitMinimumPoolSize(ctx, entity, *clientOpts.MinPoolSize); err != nil {
209+
if entityOptions.AwaitMinPoolSizeMS != nil && *entityOptions.AwaitMinPoolSizeMS > 0 &&
210+
clientOpts.MinPoolSize != nil && *clientOpts.MinPoolSize > 0 {
211+
212+
if err := func() error {
213+
awaitCtx, cancel := context.WithTimeout(ctx, time.Duration(*entityOptions.AwaitMinPoolSizeMS)*time.Millisecond)
214+
defer cancel()
215+
216+
return awaitMinimumPoolSize(awaitCtx, entity, *clientOpts.MinPoolSize)
217+
}(); err != nil {
231218
return nil, err
232219
}
233220
}
@@ -476,6 +463,17 @@ func (c *clientEntity) processFailedEvent(_ context.Context, evt *event.CommandF
476463
}
477464
}
478465

466+
func (c *clientEntity) resetEventHistory() {
467+
c.pooled = nil
468+
c.serverDescriptionChanged = nil
469+
c.serverHeartbeatStartedEvent = nil
470+
c.serverHeartbeatSucceeded = nil
471+
c.serverHeartbeatFailedEvent = nil
472+
c.topologyDescriptionChanged = nil
473+
c.topologyOpening = nil
474+
c.topologyClosed = nil
475+
}
476+
479477
func getPoolEventDocument(evt *event.PoolEvent, eventType monitoringEventType) bson.Raw {
480478
bsonBuilder := bsoncore.NewDocumentBuilder().
481479
AppendString("name", string(eventType)).
@@ -506,12 +504,21 @@ func (c *clientEntity) processPoolEvent(evt *event.PoolEvent) {
506504
return
507505
}
508506

509-
// Update the connection counter. This happens even if we're not storing any events.
507+
// Update the connection counter. This happens even if we're not storing any
508+
// events.
510509
switch evt.Type {
511510
case event.ConnectionCheckedOut:
512511
atomic.AddInt32(&c.numConnsCheckedOut, 1)
513512
case event.ConnectionCheckedIn:
514513
atomic.AddInt32(&c.numConnsCheckedOut, -1)
514+
case event.ConnectionReady:
515+
c.eventProcessMu.Lock()
516+
c.connsPerServer[evt.Address]++
517+
c.eventProcessMu.Unlock()
518+
case event.ConnectionClosed:
519+
c.eventProcessMu.Lock()
520+
c.connsPerServer[evt.Address]--
521+
c.eventProcessMu.Unlock()
515522
}
516523

517524
eventType := monitoringEventTypeFromPoolEvent(evt)
@@ -529,6 +536,15 @@ func (c *clientEntity) processPoolEvent(evt *event.PoolEvent) {
529536
}
530537
}
531538

539+
// connsReady returns the number of ready connections for the given server
540+
// address.
541+
func (c *clientEntity) connsReady(serverAddr string) int {
542+
c.eventProcessMu.RLock()
543+
defer c.eventProcessMu.RUnlock()
544+
545+
return c.connsPerServer[serverAddr]
546+
}
547+
532548
func (c *clientEntity) processServerDescriptionChangedEvent(evt *event.ServerDescriptionChangedEvent) {
533549
c.eventProcessMu.Lock()
534550
defer c.eventProcessMu.Unlock()
@@ -601,6 +617,8 @@ func (c *clientEntity) processTopologyDescriptionChangedEvent(evt *event.Topolog
601617
return
602618
}
603619

620+
c.latestTopology = evt.NewDescription
621+
604622
if _, ok := c.observedEvents[topologyDescriptionChangedEvent]; ok {
605623
c.topologyDescriptionChanged = append(c.topologyDescriptionChanged, evt)
606624
}
@@ -724,3 +742,35 @@ func evaluateUseMultipleMongoses(clientOpts *options.ClientOptions, useMultipleM
724742
}
725743
return nil
726744
}
745+
746+
// awaitMinimumPoolSize waits for the client's connection pool to reach the
747+
// specified minimum size. This is a best effort operation that times out after
748+
// some predefined amount of time to avoid blocking tests indefinitely.
749+
func awaitMinimumPoolSize(ctx context.Context, entity *clientEntity, minPoolSize uint64) error {
750+
ticker := time.NewTicker(100 * time.Millisecond)
751+
defer ticker.Stop()
752+
753+
for {
754+
select {
755+
case <-ctx.Done():
756+
return fmt.Errorf("timed out waiting for client to reach minPoolSize")
757+
case <-ticker.C:
758+
ready := true
759+
for _, server := range entity.latestTopology.Servers {
760+
if entity.connsReady(server.Addr.String()) < int(minPoolSize) {
761+
ready = false
762+
763+
// If any server has less than minPoolSize connections, continue
764+
// waiting.
765+
break
766+
}
767+
}
768+
769+
if ready {
770+
entity.resetEventHistory()
771+
772+
return nil
773+
}
774+
}
775+
}
776+
}

internal/integration/unified/entity.go

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,8 @@ import (
2323
"go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore"
2424
)
2525

26-
var (
27-
// ErrEntityMapOpen is returned when a slice entity is accessed while the EntityMap is open
28-
ErrEntityMapOpen = errors.New("slices cannot be accessed while EntityMap is open")
29-
)
26+
// ErrEntityMapOpen is returned when a slice entity is accessed while the EntityMap is open
27+
var ErrEntityMapOpen = errors.New("slices cannot be accessed while EntityMap is open")
3028

3129
var (
3230
tlsCAFile = os.Getenv("CSFLE_TLS_CA_FILE")
@@ -83,11 +81,10 @@ type entityOptions struct {
8381

8482
ClientEncryptionOpts *clientEncryptionOpts `bson:"clientEncryptionOpts"`
8583

86-
// If true, the unified spec runner must wait for the connection pool to be
87-
// populated for all servers according to the minPoolSize option. If false,
88-
// not specified, or if minPoolSize equals 0, there is no need to wait for any
89-
// specific pool state.
90-
AwaitMinPoolSize bool `bson:"awaitMinPoolSize"`
84+
// Maximum duration (in milliseconds) that the test runner MUST wait for each
85+
// connection pool to be populated with minPoolSize. Any CMAP and SDAM events
86+
// that occur before the pool is populated will be ignored.
87+
AwaitMinPoolSizeMS *int `bson:"awaitMinPoolSizeMS"`
9188
}
9289

9390
func (eo *entityOptions) setHeartbeatFrequencyMS(freq time.Duration) {
@@ -106,7 +103,8 @@ func (eo *entityOptions) setHeartbeatFrequencyMS(freq time.Duration) {
106103
// newCollectionEntityOptions constructs an entity options object for a
107104
// collection.
108105
func newCollectionEntityOptions(id string, databaseID string, collectionName string,
109-
opts *dbOrCollectionOptions) *entityOptions {
106+
opts *dbOrCollectionOptions,
107+
) *entityOptions {
110108
options := &entityOptions{
111109
ID: id,
112110
DatabaseID: databaseID,
@@ -598,7 +596,6 @@ func getKmsCredential(kmsDocument bson.Raw, credentialName string, envVar string
598596
return "", fmt.Errorf("unable to get environment value for %v. Please set the CSFLE environment variable: %v", credentialName, envVar)
599597
}
600598
return os.Getenv(envVar), nil
601-
602599
}
603600

604601
func (em *EntityMap) addClientEncryptionEntity(entityOptions *entityOptions) error {

0 commit comments

Comments
 (0)