Skip to content

Commit fdc50b0

Browse files
lance6716ti-chi-bot
authored andcommitted
ddl notifier: fix shared session by List and DeleteAndCommit (pingcap#58833)
close pingcap#58824
1 parent a0097bd commit fdc50b0

File tree

3 files changed

+69
-25
lines changed

3 files changed

+69
-25
lines changed

pkg/ddl/notifier/BUILD.bazel

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ go_test(
3636
],
3737
embed = [":notifier"],
3838
flaky = True,
39-
shard_count = 9,
39+
shard_count = 10,
4040
deps = [
4141
"//pkg/ddl",
4242
"//pkg/ddl/session",

pkg/ddl/notifier/subscribe.go

+10-3
Original file line numberDiff line numberDiff line change
@@ -231,12 +231,19 @@ func (n *DDLNotifier) processEvents(ctx context.Context) error {
231231
}
232232

233233
if change.processedByFlag == n.handlersBitMap {
234-
if err3 := n.store.DeleteAndCommit(
234+
s3, err3 := n.sysSessionPool.Get()
235+
if err3 != nil {
236+
return errors.Trace(err3)
237+
}
238+
sess4Del := sess.NewSession(s3.(sessionctx.Context))
239+
err3 = n.store.DeleteAndCommit(
235240
ctx,
236-
sess4List,
241+
sess4Del,
237242
change.ddlJobID,
238243
int(change.subJobID),
239-
); err3 != nil {
244+
)
245+
n.sysSessionPool.Put(s3)
246+
if err3 != nil {
240247
logutil.Logger(ctx).Error("Error deleting change",
241248
zap.Int64("ddlJobID", change.ddlJobID),
242249
zap.Int64("subJobID", change.subJobID),

pkg/ddl/notifier/testkit_test.go

+58-21
Original file line numberDiff line numberDiff line change
@@ -108,12 +108,7 @@ func TestBasicPubSub(t *testing.T) {
108108
return nil
109109
}
110110
n.RegisterHandler(notifier.TestHandlerID, testHandler)
111-
112-
done := make(chan struct{})
113-
go func() {
114-
n.OnBecomeOwner()
115-
close(done)
116-
}()
111+
n.OnBecomeOwner()
117112

118113
tk2 := testkit.NewTestKit(t, store)
119114
se := sess.NewSession(tk2.Session())
@@ -138,7 +133,6 @@ func TestBasicPubSub(t *testing.T) {
138133
require.Equal(t, event2, seenChanges[1])
139134
require.Equal(t, event3, seenChanges[2])
140135
n.OnRetireOwner()
141-
<-done
142136
}
143137

144138
func TestDeliverOrderAndCleanup(t *testing.T) {
@@ -186,12 +180,7 @@ func TestDeliverOrderAndCleanup(t *testing.T) {
186180
n.RegisterHandler(3, h1)
187181
n.RegisterHandler(4, h2)
188182
n.RegisterHandler(9, h3)
189-
190-
done := make(chan struct{})
191-
go func() {
192-
n.OnBecomeOwner()
193-
close(done)
194-
}()
183+
n.OnBecomeOwner()
195184

196185
tk2 := testkit.NewTestKit(t, store)
197186
se := sess.NewSession(tk2.Session())
@@ -220,7 +209,6 @@ func TestDeliverOrderAndCleanup(t *testing.T) {
220209
require.Equal(t, []int64{1000, 1001, 1002}, *id3)
221210

222211
n.OnRetireOwner()
223-
<-done
224212
}
225213

226214
func TestPubSub(t *testing.T) {
@@ -352,12 +340,7 @@ func Test2OwnerForAShortTime(t *testing.T) {
352340
return nil
353341
}
354342
n.RegisterHandler(notifier.TestHandlerID, testHandler)
355-
356-
done := make(chan struct{})
357-
go func() {
358-
n.OnBecomeOwner()
359-
close(done)
360-
}()
343+
n.OnBecomeOwner()
361344

362345
tk2 := testkit.NewTestKit(t, store)
363346
se := sess.NewSession(tk2.Session())
@@ -383,7 +366,6 @@ func Test2OwnerForAShortTime(t *testing.T) {
383366
tk2.MustQuery("SELECT * FROM test.result").Check(testkit.Rows())
384367

385368
n.OnRetireOwner()
386-
<-done
387369
}
388370

389371
func TestPaginatedList(t *testing.T) {
@@ -452,3 +434,58 @@ func TestPaginatedList(t *testing.T) {
452434
return count.Load() == 8
453435
}, 5*time.Second, 500*time.Millisecond)
454436
}
437+
438+
func TestBeginTwice(t *testing.T) {
439+
conf := new(log.Config)
440+
logFilename := path.Join(t.TempDir(), "/testBeginTwice.log")
441+
conf.File.Filename = logFilename
442+
lg, p, e := log.InitLogger(conf)
443+
require.NoError(t, e)
444+
rs := log.ReplaceGlobals(lg, p)
445+
defer rs()
446+
447+
store := testkit.CreateMockStore(t)
448+
tk := testkit.NewTestKit(t, store)
449+
tk.MustExec("USE test")
450+
tk.MustExec("DROP TABLE IF EXISTS " + ddl.NotifierTableName)
451+
tk.MustExec(ddl.NotifierTableSQL)
452+
453+
s := notifier.OpenTableStore("test", ddl.NotifierTableName)
454+
sessionPool := util.NewSessionPool(
455+
5,
456+
func() (pools.Resource, error) {
457+
return testkit.NewTestKit(t, store).Session(), nil
458+
},
459+
nil,
460+
nil,
461+
)
462+
463+
n := notifier.NewDDLNotifier(sessionPool, s, 50*time.Millisecond)
464+
465+
testHandler := func(context.Context, sessionctx.Context, *notifier.SchemaChangeEvent) error {
466+
return nil
467+
}
468+
n.RegisterHandler(notifier.TestHandlerID, testHandler)
469+
n.OnBecomeOwner()
470+
471+
tk2 := testkit.NewTestKit(t, store)
472+
se := sess.NewSession(tk2.Session())
473+
ctx := context.Background()
474+
event1 := notifier.NewCreateTableEvent(&model.TableInfo{ID: 1000, Name: ast.NewCIStr("t1")})
475+
err := notifier.PubSchemeChangeToStore(ctx, se, 1, -1, event1, s)
476+
require.NoError(t, err)
477+
478+
// after handler processed the event, wait to ensure the record is deleted by DDL notifier
479+
require.Eventually(t, func() bool {
480+
changes := make([]*notifier.SchemaChange, 8)
481+
result, closeFn := s.List(ctx, se)
482+
count, err2 := result.Read(changes)
483+
require.NoError(t, err2)
484+
closeFn()
485+
return count == 0
486+
}, time.Second, 50*time.Millisecond)
487+
488+
content, err := os.ReadFile(logFilename)
489+
require.NoError(t, err)
490+
require.NotContains(t, string(content), "context provider not set")
491+
}

0 commit comments

Comments
 (0)