Skip to content

Commit 67ff921

Browse files
committed
Add support to MaxStalenessSeconds in ReadPreference
1 parent 1a9c7a9 commit 67ff921

File tree

3 files changed

+63
-6
lines changed

3 files changed

+63
-6
lines changed

session.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,11 @@ const (
304304
// false: Initiate the connection without TLS/SSL.
305305
// The default value is false.
306306
//
307+
// maxStalenessSeconds=<seconds>
308+
//
309+
// specify a maximum replication lag, or “staleness” in seconds, for reads from secondaries, minimum value allowed is 90.
310+
// Works on MongoDB 3.4+
311+
//
307312
// Relevant documentation:
308313
//
309314
// http://docs.mongodb.org/manual/reference/connection-string/
@@ -353,6 +358,7 @@ func ParseURL(url string) (*DialInfo, error) {
353358
var readPreferenceTagSets []bson.D
354359
minPoolSize := 0
355360
maxIdleTimeMS := 0
361+
maxStalenessSeconds := 0
356362
safe := Safe{}
357363
for _, opt := range uinfo.options {
358364
switch opt.key {
@@ -390,6 +396,17 @@ func ParseURL(url string) (*DialInfo, error) {
390396
if err != nil {
391397
return nil, errors.New("bad value for maxPoolSize: " + opt.value)
392398
}
399+
case "maxStalenessSeconds":
400+
maxStalenessSeconds, err = strconv.Atoi(opt.value)
401+
402+
if err != nil {
403+
return nil, errors.New("bad value for maxStalenessSeconds: " + opt.value)
404+
}
405+
406+
if maxStalenessSeconds > 0 && maxStalenessSeconds < 90 {
407+
return nil, errors.New("maxStalenessSeconds too low " + opt.value + ", must be >= 90 seconds")
408+
}
409+
393410
case "appName":
394411
if len(opt.value) > 128 {
395412
return nil, errors.New("appName too long, must be < 128 bytes: " + opt.value)
@@ -455,6 +472,10 @@ func ParseURL(url string) (*DialInfo, error) {
455472
return nil, errors.New("readPreferenceTagSet may not be specified when readPreference is primary")
456473
}
457474

475+
if readPreferenceMode == Primary && maxStalenessSeconds > 0 {
476+
return nil, errors.New("maxStalenessSeconds may not be specified when readPreference is primary")
477+
}
478+
458479
info := DialInfo{
459480
Addrs: uinfo.addrs,
460481
Direct: direct,
@@ -469,6 +490,8 @@ func ParseURL(url string) (*DialInfo, error) {
469490
ReadPreference: &ReadPreference{
470491
Mode: readPreferenceMode,
471492
TagSets: readPreferenceTagSets,
493+
494+
MaxStalenessSeconds: maxStalenessSeconds,
472495
},
473496
Safe: safe,
474497
ReplicaSetName: setName,
@@ -607,6 +630,8 @@ func (i *DialInfo) Copy() *DialInfo {
607630
if i.ReadPreference != nil {
608631
readPreference = &ReadPreference{
609632
Mode: i.ReadPreference.Mode,
633+
634+
MaxStalenessSeconds: i.ReadPreference.MaxStalenessSeconds,
610635
}
611636
readPreference.TagSets = make([]bson.D, len(i.ReadPreference.TagSets))
612637
copy(readPreference.TagSets, i.ReadPreference.TagSets)
@@ -679,6 +704,9 @@ type ReadPreference struct {
679704
// Mode determines the consistency of results. See Session.SetMode.
680705
Mode Mode
681706

707+
// MaxStalenessSeconds specify a maximum replication lag, or “staleness” in seconds, for reads from secondaries.
708+
MaxStalenessSeconds int
709+
682710
// TagSets indicates which servers are allowed to be used. See Session.SelectServers.
683711
TagSets []bson.D
684712
}
@@ -768,6 +796,7 @@ func DialWithInfo(dialInfo *DialInfo) (*Session, error) {
768796
if info.ReadPreference != nil {
769797
session.SelectServers(info.ReadPreference.TagSets...)
770798
session.SetMode(info.ReadPreference.Mode, true)
799+
session.SetMaxStalenessSeconds(info.ReadPreference.MaxStalenessSeconds)
771800
} else {
772801
session.SetMode(Strong, true)
773802
}
@@ -2190,6 +2219,23 @@ func (s *Session) SetPoolTimeout(timeout time.Duration) {
21902219
s.m.Unlock()
21912220
}
21922221

2222+
// SetMaxStalenessSeconds set the maximum of seconds of replication lag from secondaries
2223+
// Works on MongoDB 3.4+
2224+
//
2225+
// Relevant documentation:
2226+
//
2227+
// https://docs.mongodb.com/manual/core/read-preference/#maxstalenessseconds
2228+
//
2229+
func (s *Session) SetMaxStalenessSeconds(seconds int) error {
2230+
s.m.Lock()
2231+
defer s.m.Unlock()
2232+
if seconds > 0 && seconds < 90 {
2233+
return errors.New("SetMaxStalenessSeconds: minimum of seconds is 90")
2234+
}
2235+
s.queryConfig.op.maxStalenessSeconds = seconds
2236+
return nil
2237+
}
2238+
21932239
// SetBypassValidation sets whether the server should bypass the registered
21942240
// validation expressions executed when documents are inserted or modified,
21952241
// in the interest of preserving invariants in the collection being modified.

session_test.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -167,28 +167,33 @@ func (s *S) TestURLReadPreference(c *C) {
167167
type test struct {
168168
url string
169169
mode mgo.Mode
170+
171+
maxStalenessSeconds int
170172
}
171173

172174
tests := []test{
173-
{"localhost:40001?readPreference=primary", mgo.Primary},
174-
{"localhost:40001?readPreference=primaryPreferred", mgo.PrimaryPreferred},
175-
{"localhost:40001?readPreference=secondary", mgo.Secondary},
176-
{"localhost:40001?readPreference=secondaryPreferred", mgo.SecondaryPreferred},
177-
{"localhost:40001?readPreference=nearest", mgo.Nearest},
175+
{"localhost:40001?readPreference=primary", mgo.Primary, 0},
176+
{"localhost:40001?readPreference=primaryPreferred", mgo.PrimaryPreferred, 0},
177+
{"localhost:40001?readPreference=secondary", mgo.Secondary, 0},
178+
{"localhost:40001?readPreference=secondaryPreferred", mgo.SecondaryPreferred, 0},
179+
{"localhost:40001?readPreference=secondary&maxStalenessSeconds=10", mgo.Secondary, 10},
180+
{"localhost:40001?readPreference=nearest", mgo.Nearest, 0},
178181
}
179182

180183
for _, test := range tests {
181184
info, err := mgo.ParseURL(test.url)
182185
c.Assert(err, IsNil)
183186
c.Assert(info.ReadPreference, NotNil)
184187
c.Assert(info.ReadPreference.Mode, Equals, test.mode)
188+
c.Assert(info.ReadPreference.MaxStalenessSeconds, Equals, test.maxStalenessSeconds)
185189
}
186190
}
187191

188192
func (s *S) TestURLInvalidReadPreference(c *C) {
189193
urls := []string{
190194
"localhost:40001?readPreference=foo",
191195
"localhost:40001?readPreference=primarypreferred",
196+
"localhost:40001?readPreference=primary&maxStalenessSeconds=90",
192197
}
193198
for _, url := range urls {
194199
_, err := mgo.ParseURL(url)

socket.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ type queryOp struct {
8383
hasOptions bool
8484
flags queryOpFlags
8585
readConcern string
86+
87+
maxStalenessSeconds int
8688
}
8789

8890
type queryWrapper struct {
@@ -120,11 +122,15 @@ func (op *queryOp) finalQuery(socket *mongoSocket) interface{} {
120122
panic(fmt.Sprintf("unsupported read mode: %d", op.mode))
121123
}
122124
op.hasOptions = true
123-
op.options.ReadPreference = make(bson.D, 0, 2)
125+
op.options.ReadPreference = make(bson.D, 0, 3)
124126
op.options.ReadPreference = append(op.options.ReadPreference, bson.DocElem{Name: "mode", Value: modeName})
125127
if len(op.serverTags) > 0 {
126128
op.options.ReadPreference = append(op.options.ReadPreference, bson.DocElem{Name: "tags", Value: op.serverTags})
127129
}
130+
131+
if op.maxStalenessSeconds > 0 {
132+
op.options.ReadPreference = append(op.options.ReadPreference, bson.DocElem{Name: "maxStalenessSeconds", Value: op.maxStalenessSeconds})
133+
}
128134
}
129135
if op.hasOptions {
130136
if op.query == nil {

0 commit comments

Comments
 (0)