Skip to content

Commit 268d296

Browse files
committed
Add support to MaxStalenessSeconds in ReadPreference
1 parent 1a9c7a9 commit 268d296

File tree

3 files changed

+46
-6
lines changed

3 files changed

+46
-6
lines changed

session.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,7 @@ func ParseURL(url string) (*DialInfo, error) {
353353
var readPreferenceTagSets []bson.D
354354
minPoolSize := 0
355355
maxIdleTimeMS := 0
356+
maxStalenessSeconds := 0
356357
safe := Safe{}
357358
for _, opt := range uinfo.options {
358359
switch opt.key {
@@ -390,6 +391,11 @@ func ParseURL(url string) (*DialInfo, error) {
390391
if err != nil {
391392
return nil, errors.New("bad value for maxPoolSize: " + opt.value)
392393
}
394+
case "maxStalenessSeconds":
395+
maxStalenessSeconds, err = strconv.Atoi(opt.value)
396+
if err != nil {
397+
return nil, errors.New("bad value for maxStalenessSeconds: " + opt.value)
398+
}
393399
case "appName":
394400
if len(opt.value) > 128 {
395401
return nil, errors.New("appName too long, must be < 128 bytes: " + opt.value)
@@ -469,6 +475,8 @@ func ParseURL(url string) (*DialInfo, error) {
469475
ReadPreference: &ReadPreference{
470476
Mode: readPreferenceMode,
471477
TagSets: readPreferenceTagSets,
478+
479+
MaxStalenessSeconds: maxStalenessSeconds,
472480
},
473481
Safe: safe,
474482
ReplicaSetName: setName,
@@ -607,6 +615,8 @@ func (i *DialInfo) Copy() *DialInfo {
607615
if i.ReadPreference != nil {
608616
readPreference = &ReadPreference{
609617
Mode: i.ReadPreference.Mode,
618+
619+
MaxStalenessSeconds: i.ReadPreference.MaxStalenessSeconds,
610620
}
611621
readPreference.TagSets = make([]bson.D, len(i.ReadPreference.TagSets))
612622
copy(readPreference.TagSets, i.ReadPreference.TagSets)
@@ -679,6 +689,9 @@ type ReadPreference struct {
679689
// Mode determines the consistency of results. See Session.SetMode.
680690
Mode Mode
681691

692+
// MaxStalenessSeconds specify a maximum replication lag, or “staleness” in seconds, for reads from secondaries.
693+
MaxStalenessSeconds int
694+
682695
// TagSets indicates which servers are allowed to be used. See Session.SelectServers.
683696
TagSets []bson.D
684697
}
@@ -768,6 +781,7 @@ func DialWithInfo(dialInfo *DialInfo) (*Session, error) {
768781
if info.ReadPreference != nil {
769782
session.SelectServers(info.ReadPreference.TagSets...)
770783
session.SetMode(info.ReadPreference.Mode, true)
784+
session.SetMaxStalenessSeconds(info.ReadPreference.MaxStalenessSeconds)
771785
} else {
772786
session.SetMode(Strong, true)
773787
}
@@ -2190,6 +2204,22 @@ func (s *Session) SetPoolTimeout(timeout time.Duration) {
21902204
s.m.Unlock()
21912205
}
21922206

2207+
// SetMaxStalenessSeconds set the maximum of seconds of replication lag from secondaries
2208+
//
2209+
// Relevant documentation:
2210+
//
2211+
// https://docs.mongodb.com/manual/core/read-preference/#maxstalenessseconds
2212+
//
2213+
func (s *Session) SetMaxStalenessSeconds(seconds int) {
2214+
s.m.Lock()
2215+
if seconds > -1 {
2216+
s.queryConfig.op.maxStalenessSeconds = seconds
2217+
} else {
2218+
panic("SetMaxStalenessSeconds: support only positive numbers")
2219+
}
2220+
s.m.Unlock()
2221+
}
2222+
21932223
// SetBypassValidation sets whether the server should bypass the registered
21942224
// validation expressions executed when documents are inserted or modified,
21952225
// in the interest of preserving invariants in the collection being modified.

session_test.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -167,21 +167,25 @@ func (s *S) TestURLReadPreference(c *C) {
167167
type test struct {
168168
url string
169169
mode mgo.Mode
170+
171+
maxStalenessSeconds int
170172
}
171173

172174
tests := []test{
173-
{"localhost:40001?readPreference=primary", mgo.Primary},
174-
{"localhost:40001?readPreference=primaryPreferred", mgo.PrimaryPreferred},
175-
{"localhost:40001?readPreference=secondary", mgo.Secondary},
176-
{"localhost:40001?readPreference=secondaryPreferred", mgo.SecondaryPreferred},
177-
{"localhost:40001?readPreference=nearest", mgo.Nearest},
175+
{"localhost:40001?readPreference=primary&maxStalenessSeconds=10", mgo.Primary, 10},
176+
{"localhost:40001?readPreference=primary", mgo.Primary, 0},
177+
{"localhost:40001?readPreference=primaryPreferred", mgo.PrimaryPreferred, 0},
178+
{"localhost:40001?readPreference=secondary", mgo.Secondary, 0},
179+
{"localhost:40001?readPreference=secondaryPreferred", mgo.SecondaryPreferred, 0},
180+
{"localhost:40001?readPreference=nearest", mgo.Nearest, 0},
178181
}
179182

180183
for _, test := range tests {
181184
info, err := mgo.ParseURL(test.url)
182185
c.Assert(err, IsNil)
183186
c.Assert(info.ReadPreference, NotNil)
184187
c.Assert(info.ReadPreference.Mode, Equals, test.mode)
188+
c.Assert(info.ReadPreference.MaxStalenessSeconds, Equals, test.maxStalenessSeconds)
185189
}
186190
}
187191

socket.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ type queryOp struct {
8383
hasOptions bool
8484
flags queryOpFlags
8585
readConcern string
86+
87+
maxStalenessSeconds int
8688
}
8789

8890
type queryWrapper struct {
@@ -120,11 +122,15 @@ func (op *queryOp) finalQuery(socket *mongoSocket) interface{} {
120122
panic(fmt.Sprintf("unsupported read mode: %d", op.mode))
121123
}
122124
op.hasOptions = true
123-
op.options.ReadPreference = make(bson.D, 0, 2)
125+
op.options.ReadPreference = make(bson.D, 0, 3)
124126
op.options.ReadPreference = append(op.options.ReadPreference, bson.DocElem{Name: "mode", Value: modeName})
125127
if len(op.serverTags) > 0 {
126128
op.options.ReadPreference = append(op.options.ReadPreference, bson.DocElem{Name: "tags", Value: op.serverTags})
127129
}
130+
131+
if op.maxStalenessSeconds > 0 {
132+
op.options.ReadPreference = append(op.options.ReadPreference, bson.DocElem{Name: "maxStalenessSeconds", Value: op.maxStalenessSeconds})
133+
}
128134
}
129135
if op.hasOptions {
130136
if op.query == nil {

0 commit comments

Comments
 (0)