Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 94 additions & 26 deletions internal/integration/unified/client_entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.mongodb.org/mongo-driver/v2/mongo/options"
"go.mongodb.org/mongo-driver/v2/mongo/readconcern"
"go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore"
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/description"
)

// There are no automated tests for truncation. Given that, setting the
Expand Down Expand Up @@ -59,6 +60,8 @@ type clientEntity struct {
ignoredCommands map[string]struct{}
observeSensitiveCommands *bool
numConnsCheckedOut int32
latestDesc event.TopologyDescription
connsPerServer map[string]int

// These should not be changed after the clientEntity is initialized
observedEvents map[monitoringEventType]struct{}
Expand All @@ -75,29 +78,6 @@ type clientEntity struct {
logQueue chan orderedLogMessage
}

// awaitMinimumPoolSize waits for the client's connection pool to reach the
// specified minimum size. This is a best effort operation that times out after
// some predefined amount of time to avoid blocking tests indefinitely.
func awaitMinimumPoolSize(ctx context.Context, entity *clientEntity, minPoolSize uint64) error {
// Don't spend longer than 500ms awaiting minPoolSize.
awaitCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
defer cancel()

ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-awaitCtx.Done():
return fmt.Errorf("timed out waiting for client to reach minPoolSize")
case <-ticker.C:
if uint64(entity.eventsCount[connectionReadyEvent]) >= minPoolSize {
return nil
}
}
}
}

func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOptions) (*clientEntity, error) {
// The "configureFailPoint" command should always be ignored.
ignoredCommands := map[string]struct{}{
Expand All @@ -118,6 +98,7 @@ func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOp
serverDescriptionChangedEventsCount: make(map[serverDescriptionChangedEventInfo]int32),
entityMap: em,
observeSensitiveCommands: entityOptions.ObserveSensitiveCommands,
connsPerServer: make(map[string]int),
}
entity.setRecordEvents(true)

Expand Down Expand Up @@ -226,8 +207,17 @@ func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOp
return nil, fmt.Errorf("error creating mongo.Client: %w", err)
}

if entityOptions.AwaitMinPoolSize && clientOpts.MinPoolSize != nil && *clientOpts.MinPoolSize > 0 {
if err := awaitMinimumPoolSize(ctx, entity, *clientOpts.MinPoolSize); err != nil {
if entityOptions.AwaitMinPoolSizeMS != nil && *entityOptions.AwaitMinPoolSizeMS > 0 &&
clientOpts.MinPoolSize != nil && *clientOpts.MinPoolSize > 0 {

if err := func() error {
awaitDur := time.Duration(*entityOptions.AwaitMinPoolSizeMS) * time.Millisecond

awaitCtx, cancel := context.WithTimeout(ctx, awaitDur)
defer cancel()

return awaitMinimumPoolSize(awaitCtx, entity, *clientOpts.MinPoolSize)
}(); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -476,6 +466,27 @@ func (c *clientEntity) processFailedEvent(_ context.Context, evt *event.CommandF
}
}

func (c *clientEntity) resetEventHistory() {
c.eventProcessMu.Lock()
defer c.eventProcessMu.Unlock()

c.pooled = nil
c.serverDescriptionChanged = nil
c.serverHeartbeatStartedEvent = nil
c.serverHeartbeatSucceeded = nil
c.serverHeartbeatFailedEvent = nil
c.topologyDescriptionChanged = nil
c.topologyOpening = nil
c.topologyClosed = nil
}

func (c *clientEntity) latestTopology() event.TopologyDescription {
c.eventProcessMu.RLock()
defer c.eventProcessMu.RUnlock()

return c.latestDesc
}

func getPoolEventDocument(evt *event.PoolEvent, eventType monitoringEventType) bson.Raw {
bsonBuilder := bsoncore.NewDocumentBuilder().
AppendString("name", string(eventType)).
Expand Down Expand Up @@ -506,12 +517,21 @@ func (c *clientEntity) processPoolEvent(evt *event.PoolEvent) {
return
}

// Update the connection counter. This happens even if we're not storing any events.
// Update the connection counter. This happens even if we're not storing any
// events.
switch evt.Type {
case event.ConnectionCheckedOut:
atomic.AddInt32(&c.numConnsCheckedOut, 1)
case event.ConnectionCheckedIn:
atomic.AddInt32(&c.numConnsCheckedOut, -1)
case event.ConnectionReady:
c.eventProcessMu.Lock()
c.connsPerServer[evt.Address]++
c.eventProcessMu.Unlock()
case event.ConnectionClosed:
c.eventProcessMu.Lock()
c.connsPerServer[evt.Address]--
c.eventProcessMu.Unlock()
}

eventType := monitoringEventTypeFromPoolEvent(evt)
Expand All @@ -529,6 +549,20 @@ func (c *clientEntity) processPoolEvent(evt *event.PoolEvent) {
}
}

// connsReady returns the number of ready connections for the given server
// address. If the server is not data-bearing, this method will return -1.
func (c *clientEntity) connsReady(server event.ServerDescription) int {
c.eventProcessMu.RLock()
defer c.eventProcessMu.RUnlock()

if server.Kind == description.ServerKindRSArbiter.String() ||
server.Kind == description.ServerKindRSGhost.String() {
return -1
}

return c.connsPerServer[server.Addr.String()]
}

func (c *clientEntity) processServerDescriptionChangedEvent(evt *event.ServerDescriptionChangedEvent) {
c.eventProcessMu.Lock()
defer c.eventProcessMu.Unlock()
Expand Down Expand Up @@ -601,6 +635,8 @@ func (c *clientEntity) processTopologyDescriptionChangedEvent(evt *event.Topolog
return
}

c.latestDesc = evt.NewDescription

if _, ok := c.observedEvents[topologyDescriptionChangedEvent]; ok {
c.topologyDescriptionChanged = append(c.topologyDescriptionChanged, evt)
}
Expand Down Expand Up @@ -724,3 +760,35 @@ func evaluateUseMultipleMongoses(clientOpts *options.ClientOptions, useMultipleM
}
return nil
}

// awaitMinimumPoolSize waits for the client's connection pool to reach the
// specified minimum size. This is a best effort operation that times out after
// some predefined amount of time to avoid blocking tests indefinitely.
func awaitMinimumPoolSize(ctx context.Context, entity *clientEntity, minPoolSize uint64) error {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return fmt.Errorf("timed out waiting for client to reach minPoolSize")
case <-ticker.C:
ready := true
for _, server := range entity.latestTopology().Servers {
if r := entity.connsReady(server); r >= 0 && r < int(minPoolSize) {
ready = false

// If any server has less than minPoolSize connections, continue
// waiting.
break
}
}

if ready {
entity.resetEventHistory()

return nil
}
}
}
}
19 changes: 8 additions & 11 deletions internal/integration/unified/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ import (
"go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore"
)

var (
// ErrEntityMapOpen is returned when a slice entity is accessed while the EntityMap is open
ErrEntityMapOpen = errors.New("slices cannot be accessed while EntityMap is open")
)
// ErrEntityMapOpen is returned when a slice entity is accessed while the EntityMap is open
var ErrEntityMapOpen = errors.New("slices cannot be accessed while EntityMap is open")

var (
tlsCAFile = os.Getenv("CSFLE_TLS_CA_FILE")
Expand Down Expand Up @@ -83,11 +81,10 @@ type entityOptions struct {

ClientEncryptionOpts *clientEncryptionOpts `bson:"clientEncryptionOpts"`

// If true, the unified spec runner must wait for the connection pool to be
// populated for all servers according to the minPoolSize option. If false,
// not specified, or if minPoolSize equals 0, there is no need to wait for any
// specific pool state.
AwaitMinPoolSize bool `bson:"awaitMinPoolSize"`
// Maximum duration (in milliseconds) that the test runner MUST wait for each
// connection pool to be populated with minPoolSize. Any CMAP and SDAM events
// that occur before the pool is populated will be ignored.
AwaitMinPoolSizeMS *int `bson:"awaitMinPoolSizeMS"`
}

func (eo *entityOptions) setHeartbeatFrequencyMS(freq time.Duration) {
Expand All @@ -106,7 +103,8 @@ func (eo *entityOptions) setHeartbeatFrequencyMS(freq time.Duration) {
// newCollectionEntityOptions constructs an entity options object for a
// collection.
func newCollectionEntityOptions(id string, databaseID string, collectionName string,
opts *dbOrCollectionOptions) *entityOptions {
opts *dbOrCollectionOptions,
) *entityOptions {
options := &entityOptions{
ID: id,
DatabaseID: databaseID,
Expand Down Expand Up @@ -598,7 +596,6 @@ func getKmsCredential(kmsDocument bson.Raw, credentialName string, envVar string
return "", fmt.Errorf("unable to get environment value for %v. Please set the CSFLE environment variable: %v", credentialName, envVar)
}
return os.Getenv(envVar), nil

}

func (em *EntityMap) addClientEncryptionEntity(entityOptions *entityOptions) error {
Expand Down
Loading