Skip to content

Commit 79f2809

Browse files
pooknullhors
andauthored
K8SPSMDB-962: fix memory leak (percona#1295)
* K8SPSMDB-962: fix memory leak https://jira.percona.com/browse/K8SPSMDB-962 * add `TestConnectionLeaks` * golangci-lint: remove unused code --------- Co-authored-by: Viacheslav Sarzhan <[email protected]>
1 parent 07886d3 commit 79f2809

29 files changed

+1085
-256
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ percona-server-mongodb-operator
116116
mongodb-healthcheck
117117

118118
!cmd/percona-server-mongodb-operator
119+
!cmd/mongodb-healthcheck
119120

120121
# End of https://www.gitignore.io/api/go,vim,emacs,visualstudiocode
121122

cmd/mongodb-healthcheck/main.go

+12-7
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@ package main
1717
import (
1818
"context"
1919
"os"
20+
"os/signal"
2021
"strconv"
2122
"strings"
23+
"syscall"
2224

2325
uzap "go.uber.org/zap"
2426
"go.uber.org/zap/zapcore"
@@ -36,6 +38,9 @@ var (
3638
)
3739

3840
func main() {
41+
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, os.Interrupt)
42+
defer stop()
43+
3944
app := tool.New("Performs health and readiness checks for MongoDB", GitCommit, GitBranch)
4045

4146
k8sCmd := app.Command("k8s", "Performs liveness check for MongoDB on Kubernetes")
@@ -77,14 +82,14 @@ func main() {
7782
os.Exit(1)
7883
}
7984

80-
client, err := db.Dial(cnf)
85+
client, err := db.Dial(ctx, cnf)
8186
if err != nil {
8287
log.Error(err, "connection error")
8388
os.Exit(1)
8489
}
8590

8691
defer func() {
87-
if err := client.Disconnect(context.TODO()); err != nil {
92+
if err := client.Disconnect(ctx); err != nil {
8893
log.Error(err, "failed to disconnect")
8994
os.Exit(1)
9095
}
@@ -99,7 +104,7 @@ func main() {
99104
case "mongod":
100105
memberState, err := healthcheck.HealthCheckMongodLiveness(client, int64(*startupDelaySeconds))
101106
if err != nil {
102-
client.Disconnect(context.TODO()) // nolint:golint,errcheck
107+
client.Disconnect(ctx) // nolint:golint,errcheck
103108
log.Error(err, "Member failed Kubernetes liveness check")
104109
os.Exit(1)
105110
}
@@ -108,7 +113,7 @@ func main() {
108113
case "mongos":
109114
err := healthcheck.HealthCheckMongosLiveness(client)
110115
if err != nil {
111-
client.Disconnect(context.TODO()) // nolint:golint,errcheck
116+
client.Disconnect(ctx) // nolint:golint,errcheck
112117
log.Error(err, "Member failed Kubernetes liveness check")
113118
os.Exit(1)
114119
}
@@ -120,14 +125,14 @@ func main() {
120125
switch *component {
121126

122127
case "mongod":
123-
client.Disconnect(context.TODO()) // nolint:golint,errcheck
128+
client.Disconnect(ctx) // nolint:golint,errcheck
124129
log.Error(err, "readiness check for mongod is not implemented")
125130
os.Exit(1)
126131

127132
case "mongos":
128-
err := healthcheck.MongosReadinessCheck(client)
133+
err := healthcheck.MongosReadinessCheck(ctx, client)
129134
if err != nil {
130-
client.Disconnect(context.TODO()) // nolint:golint,errcheck
135+
client.Disconnect(ctx) // nolint:golint,errcheck
131136
log.Error(err, "Member failed Kubernetes readiness check")
132137
os.Exit(1)
133138
}

healthcheck/health.go

+7-8
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"github.com/pkg/errors"
2323
"go.mongodb.org/mongo-driver/bson"
2424
"go.mongodb.org/mongo-driver/bson/primitive"
25-
mgo "go.mongodb.org/mongo-driver/mongo"
2625
)
2726

2827
// OkMemberStates is a slice of acceptable replication member states
@@ -57,8 +56,8 @@ func isStateOk(memberState *mongo.MemberState, okMemberStates []mongo.MemberStat
5756
}
5857

5958
// HealthCheck checks the replication member state of the local MongoDB member
60-
func HealthCheck(client *mgo.Client, okMemberStates []mongo.MemberState) (State, *mongo.MemberState, error) {
61-
rsStatus, err := mongo.RSStatus(context.TODO(), client)
59+
func HealthCheck(client mongo.Client, okMemberStates []mongo.MemberState) (State, *mongo.MemberState, error) {
60+
rsStatus, err := client.RSStatus(context.TODO())
6261
if err != nil {
6362
return StateFailed, nil, errors.Wrap(err, "get replica set status")
6463
}
@@ -74,8 +73,8 @@ func HealthCheck(client *mgo.Client, okMemberStates []mongo.MemberState) (State,
7473
return StateFailed, state, errors.Errorf("member has unhealthy replication state: %d", state)
7574
}
7675

77-
func HealthCheckMongosLiveness(client *mgo.Client) error {
78-
isMasterResp, err := mongo.IsMaster(context.TODO(), client)
76+
func HealthCheckMongosLiveness(client mongo.Client) error {
77+
isMasterResp, err := client.IsMaster(context.TODO())
7978
if err != nil {
8079
return errors.Wrap(err, "get isMaster response")
8180
}
@@ -87,13 +86,13 @@ func HealthCheckMongosLiveness(client *mgo.Client) error {
8786
return nil
8887
}
8988

90-
func HealthCheckMongodLiveness(client *mgo.Client, startupDelaySeconds int64) (*mongo.MemberState, error) {
91-
isMasterResp, err := mongo.IsMaster(context.TODO(), client)
89+
func HealthCheckMongodLiveness(client mongo.Client, startupDelaySeconds int64) (*mongo.MemberState, error) {
90+
isMasterResp, err := client.IsMaster(context.TODO())
9291
if err != nil {
9392
return nil, errors.Wrap(err, "get isMaster response")
9493
}
9594

96-
buildInfo, err := mongo.RSBuildInfo(context.TODO(), client)
95+
buildInfo, err := client.RSBuildInfo(context.TODO())
9796
if err != nil {
9897
return nil, errors.Wrap(err, "get buildInfo response")
9998
}

healthcheck/readiness.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,23 @@ import (
1919

2020
"github.com/pkg/errors"
2121
"go.mongodb.org/mongo-driver/bson"
22-
"go.mongodb.org/mongo-driver/mongo"
2322
"go.mongodb.org/mongo-driver/mongo/readpref"
23+
24+
"github.com/percona/percona-server-mongodb-operator/pkg/psmdb/mongo"
2425
)
2526

2627
// ReadinessCheck runs a ping on a pmgo.SessionManager to check server readiness
27-
func ReadinessCheck(client *mongo.Client) (State, error) {
28-
if err := client.Ping(context.TODO(), readpref.Primary()); err != nil {
28+
func ReadinessCheck(ctx context.Context, client mongo.Client) (State, error) {
29+
if err := client.Ping(ctx, readpref.Primary()); err != nil {
2930
return StateFailed, errors.Wrap(err, "ping")
3031
}
3132

3233
return StateOk, nil
3334
}
3435

35-
func MongosReadinessCheck(client *mongo.Client) error {
36+
func MongosReadinessCheck(ctx context.Context, client mongo.Client) error {
3637
ss := ServerStatus{}
37-
cur := client.Database("admin").RunCommand(context.TODO(), bson.D{
38+
cur := client.Database("admin").RunCommand(ctx, bson.D{
3839
{Key: "listDatabases", Value: 1},
3940
{Key: "filter", Value: bson.D{{Key: "name", Value: "admin"}}},
4041
{Key: "nameOnly", Value: true}})

healthcheck/tools/db/db.go

+11-9
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,16 @@ import (
2222
log "github.com/sirupsen/logrus"
2323
mgo "go.mongodb.org/mongo-driver/mongo"
2424
"go.mongodb.org/mongo-driver/mongo/options"
25+
26+
"github.com/percona/percona-server-mongodb-operator/pkg/psmdb/mongo"
2527
)
2628

2729
var (
28-
ErrMsgAuthFailedStr string = "server returned error on SASL authentication step: Authentication failed."
29-
ErrNoReachableServersStr string = "no reachable servers"
30+
ErrMsgAuthFailedStr = "server returned error on SASL authentication step: Authentication failed."
31+
ErrNoReachableServersStr = "no reachable servers"
3032
)
3133

32-
func Dial(conf *Config) (*mgo.Client, error) {
34+
func Dial(ctx context.Context, conf *Config) (mongo.Client, error) {
3335
log.WithFields(log.Fields{
3436
"hosts": conf.Hosts,
3537
"ssl": conf.SSL.Enabled,
@@ -52,13 +54,13 @@ func Dial(conf *Config) (*mgo.Client, error) {
5254
log.WithFields(log.Fields{"user": conf.Username}).Debug("Enabling authentication for session")
5355
}
5456

55-
client, err := mgo.Connect(context.TODO(), opts)
57+
client, err := mgo.Connect(ctx, opts)
5658
if err != nil {
5759
return nil, errors.Wrap(err, "connect to mongo replica set")
5860
}
5961

60-
if err := client.Ping(context.TODO(), nil); err != nil {
61-
if err := client.Disconnect(context.TODO()); err != nil {
62+
if err := client.Ping(ctx, nil); err != nil {
63+
if err := client.Disconnect(ctx); err != nil {
6264
return nil, errors.Wrap(err, "disconnect client")
6365
}
6466

@@ -69,15 +71,15 @@ func Dial(conf *Config) (*mgo.Client, error) {
6971
SetServerSelectionTimeout(10 * time.Second).
7072
SetDirect(true)
7173

72-
client, err = mgo.Connect(context.TODO(), opts)
74+
client, err = mgo.Connect(ctx, opts)
7375
if err != nil {
7476
return nil, errors.Wrap(err, "connect to mongo replica set with direct")
7577
}
7678

77-
if err := client.Ping(context.TODO(), nil); err != nil {
79+
if err := client.Ping(ctx, nil); err != nil {
7880
return nil, errors.Wrap(err, "ping mongo")
7981
}
8082
}
8183

82-
return client, nil
84+
return mongo.ToInterface(client), nil
8385
}

pkg/controller/perconaservermongodb/backup.go

+14-14
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ func (r *ReconcilePerconaServerMongoDB) updatePITR(ctx context.Context, cr *api.
357357
}
358358
}
359359

360-
val, err := pbm.C.GetConfigVar("pitr.enabled")
360+
val, err := pbm.GetConfigVar("pitr.enabled")
361361
if err != nil {
362362
if !errors.Is(err, mongo.ErrNoDocuments) {
363363
return errors.Wrap(err, "get pitr.enabled")
@@ -374,7 +374,7 @@ func (r *ReconcilePerconaServerMongoDB) updatePITR(ctx context.Context, cr *api.
374374
if enabled != cr.Spec.Backup.PITR.Enabled {
375375
val := strconv.FormatBool(cr.Spec.Backup.PITR.Enabled)
376376
log.Info("Setting pitr.enabled in PBM config", "enabled", val)
377-
if err := pbm.C.SetConfigVar("pitr.enabled", val); err != nil {
377+
if err := pbm.SetConfigVar("pitr.enabled", val); err != nil {
378378
return errors.Wrap(err, "update pitr.enabled")
379379
}
380380
}
@@ -383,7 +383,7 @@ func (r *ReconcilePerconaServerMongoDB) updatePITR(ctx context.Context, cr *api.
383383
return nil
384384
}
385385

386-
val, err = pbm.C.GetConfigVar("pitr.oplogSpanMin")
386+
val, err = pbm.GetConfigVar("pitr.oplogSpanMin")
387387
if err != nil {
388388
if !errors.Is(err, mongo.ErrNoDocuments) {
389389
return errors.Wrap(err, "get pitr.oplogSpanMin")
@@ -399,12 +399,12 @@ func (r *ReconcilePerconaServerMongoDB) updatePITR(ctx context.Context, cr *api.
399399

400400
if oplogSpanMin != cr.Spec.Backup.PITR.OplogSpanMin.Float64() {
401401
val := cr.Spec.Backup.PITR.OplogSpanMin.String()
402-
if err := pbm.C.SetConfigVar("pitr.oplogSpanMin", val); err != nil {
402+
if err := pbm.SetConfigVar("pitr.oplogSpanMin", val); err != nil {
403403
return errors.Wrap(err, "update pitr.oplogSpanMin")
404404
}
405405
}
406406

407-
val, err = pbm.C.GetConfigVar("pitr.compression")
407+
val, err = pbm.GetConfigVar("pitr.compression")
408408
var compression = ""
409409
if err != nil {
410410
if errors.Is(err, mongo.ErrNoDocuments) {
@@ -421,23 +421,23 @@ func (r *ReconcilePerconaServerMongoDB) updatePITR(ctx context.Context, cr *api.
421421

422422
if compression != string(cr.Spec.Backup.PITR.CompressionType) {
423423
if string(cr.Spec.Backup.PITR.CompressionType) == "" {
424-
if err := pbm.C.DeleteConfigVar("pitr.compression"); err != nil {
424+
if err := pbm.DeleteConfigVar("pitr.compression"); err != nil {
425425
return errors.Wrap(err, "delete pitr.compression")
426426
}
427-
} else if err := pbm.C.SetConfigVar("pitr.compression", string(cr.Spec.Backup.PITR.CompressionType)); err != nil {
427+
} else if err := pbm.SetConfigVar("pitr.compression", string(cr.Spec.Backup.PITR.CompressionType)); err != nil {
428428
return errors.Wrap(err, "update pitr.compression")
429429
}
430430

431431
// PBM needs to disabling and enabling PITR to change compression type
432-
if err := pbm.C.SetConfigVar("pitr.enabled", "false"); err != nil {
432+
if err := pbm.SetConfigVar("pitr.enabled", "false"); err != nil {
433433
return errors.Wrap(err, "disable pitr")
434434
}
435-
if err := pbm.C.SetConfigVar("pitr.enabled", "true"); err != nil {
435+
if err := pbm.SetConfigVar("pitr.enabled", "true"); err != nil {
436436
return errors.Wrap(err, "enable pitr")
437437
}
438438
}
439439

440-
val, err = pbm.C.GetConfigVar("pitr.compressionLevel")
440+
val, err = pbm.GetConfigVar("pitr.compressionLevel")
441441
var compressionLevel *int = nil
442442
if err != nil {
443443
if errors.Is(err, mongo.ErrNoDocuments) {
@@ -455,18 +455,18 @@ func (r *ReconcilePerconaServerMongoDB) updatePITR(ctx context.Context, cr *api.
455455

456456
if !reflect.DeepEqual(compressionLevel, cr.Spec.Backup.PITR.CompressionLevel) {
457457
if cr.Spec.Backup.PITR.CompressionLevel == nil {
458-
if err := pbm.C.DeleteConfigVar("pitr.compressionLevel"); err != nil {
458+
if err := pbm.DeleteConfigVar("pitr.compressionLevel"); err != nil {
459459
return errors.Wrap(err, "delete pitr.compressionLevel")
460460
}
461-
} else if err := pbm.C.SetConfigVar("pitr.compressionLevel", strconv.FormatInt(int64(*cr.Spec.Backup.PITR.CompressionLevel), 10)); err != nil {
461+
} else if err := pbm.SetConfigVar("pitr.compressionLevel", strconv.FormatInt(int64(*cr.Spec.Backup.PITR.CompressionLevel), 10)); err != nil {
462462
return errors.Wrap(err, "update pitr.compressionLevel")
463463
}
464464

465465
// PBM needs to disabling and enabling PITR to change compression level
466-
if err := pbm.C.SetConfigVar("pitr.enabled", "false"); err != nil {
466+
if err := pbm.SetConfigVar("pitr.enabled", "false"); err != nil {
467467
return errors.Wrap(err, "disable pitr")
468468
}
469-
if err := pbm.C.SetConfigVar("pitr.enabled", "true"); err != nil {
469+
if err := pbm.SetConfigVar("pitr.enabled", "true"); err != nil {
470470
return errors.Wrap(err, "enable pitr")
471471
}
472472
}

pkg/controller/perconaservermongodb/balancer.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"time"
66

77
"github.com/percona/percona-server-mongodb-operator/pkg/psmdb"
8-
"github.com/percona/percona-server-mongodb-operator/pkg/psmdb/mongo"
98
"github.com/pkg/errors"
109
corev1 "k8s.io/api/core/v1"
1110
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
@@ -87,13 +86,13 @@ func (r *ReconcilePerconaServerMongoDB) enableBalancerIfNeeded(ctx context.Conte
8786
}
8887
}()
8988

90-
run, err := mongo.IsBalancerRunning(ctx, mongosSession)
89+
run, err := mongosSession.IsBalancerRunning(ctx)
9190
if err != nil {
9291
return errors.Wrap(err, "failed to check if balancer running")
9392
}
9493

9594
if !run {
96-
err := mongo.StartBalancer(ctx, mongosSession)
95+
err := mongosSession.StartBalancer(ctx)
9796
if err != nil {
9897
return errors.Wrap(err, "failed to start balancer")
9998
}
@@ -133,13 +132,13 @@ func (r *ReconcilePerconaServerMongoDB) disableBalancer(ctx context.Context, cr
133132
}
134133
}()
135134

136-
run, err := mongo.IsBalancerRunning(ctx, mongosSession)
135+
run, err := mongosSession.IsBalancerRunning(ctx)
137136
if err != nil {
138137
return errors.Wrap(err, "failed to check if balancer running")
139138
}
140139

141140
if run {
142-
err := mongo.StopBalancer(ctx, mongosSession)
141+
err := mongosSession.StopBalancer(ctx)
143142
if err != nil {
144143
return errors.Wrap(err, "failed to stop balancer")
145144
}

0 commit comments

Comments
 (0)