@@ -108,12 +108,7 @@ func TestBasicPubSub(t *testing.T) {
108108 return nil
109109 }
110110 n .RegisterHandler (notifier .TestHandlerID , testHandler )
111-
112- done := make (chan struct {})
113- go func () {
114- n .OnBecomeOwner ()
115- close (done )
116- }()
111+ n .OnBecomeOwner ()
117112
118113 tk2 := testkit .NewTestKit (t , store )
119114 se := sess .NewSession (tk2 .Session ())
@@ -138,7 +133,6 @@ func TestBasicPubSub(t *testing.T) {
138133 require .Equal (t , event2 , seenChanges [1 ])
139134 require .Equal (t , event3 , seenChanges [2 ])
140135 n .OnRetireOwner ()
141- <- done
142136}
143137
144138func TestDeliverOrderAndCleanup (t * testing.T ) {
@@ -186,12 +180,7 @@ func TestDeliverOrderAndCleanup(t *testing.T) {
186180 n .RegisterHandler (3 , h1 )
187181 n .RegisterHandler (4 , h2 )
188182 n .RegisterHandler (9 , h3 )
189-
190- done := make (chan struct {})
191- go func () {
192- n .OnBecomeOwner ()
193- close (done )
194- }()
183+ n .OnBecomeOwner ()
195184
196185 tk2 := testkit .NewTestKit (t , store )
197186 se := sess .NewSession (tk2 .Session ())
@@ -220,7 +209,6 @@ func TestDeliverOrderAndCleanup(t *testing.T) {
220209 require .Equal (t , []int64 {1000 , 1001 , 1002 }, * id3 )
221210
222211 n .OnRetireOwner ()
223- <- done
224212}
225213
226214func TestPubSub (t * testing.T ) {
@@ -352,12 +340,7 @@ func Test2OwnerForAShortTime(t *testing.T) {
352340 return nil
353341 }
354342 n .RegisterHandler (notifier .TestHandlerID , testHandler )
355-
356- done := make (chan struct {})
357- go func () {
358- n .OnBecomeOwner ()
359- close (done )
360- }()
343+ n .OnBecomeOwner ()
361344
362345 tk2 := testkit .NewTestKit (t , store )
363346 se := sess .NewSession (tk2 .Session ())
@@ -383,7 +366,6 @@ func Test2OwnerForAShortTime(t *testing.T) {
383366 tk2 .MustQuery ("SELECT * FROM test.result" ).Check (testkit .Rows ())
384367
385368 n .OnRetireOwner ()
386- <- done
387369}
388370
389371func TestPaginatedList (t * testing.T ) {
@@ -452,3 +434,58 @@ func TestPaginatedList(t *testing.T) {
452434 return count .Load () == 8
453435 }, 5 * time .Second , 500 * time .Millisecond )
454436}
437+
438+ func TestBeginTwice (t * testing.T ) {
439+ conf := new (log.Config )
440+ logFilename := path .Join (t .TempDir (), "/testBeginTwice.log" )
441+ conf .File .Filename = logFilename
442+ lg , p , e := log .InitLogger (conf )
443+ require .NoError (t , e )
444+ rs := log .ReplaceGlobals (lg , p )
445+ defer rs ()
446+
447+ store := testkit .CreateMockStore (t )
448+ tk := testkit .NewTestKit (t , store )
449+ tk .MustExec ("USE test" )
450+ tk .MustExec ("DROP TABLE IF EXISTS " + ddl .NotifierTableName )
451+ tk .MustExec (ddl .NotifierTableSQL )
452+
453+ s := notifier .OpenTableStore ("test" , ddl .NotifierTableName )
454+ sessionPool := util .NewSessionPool (
455+ 5 ,
456+ func () (pools.Resource , error ) {
457+ return testkit .NewTestKit (t , store ).Session (), nil
458+ },
459+ nil ,
460+ nil ,
461+ )
462+
463+ n := notifier .NewDDLNotifier (sessionPool , s , 50 * time .Millisecond )
464+
465+ testHandler := func (context.Context , sessionctx.Context , * notifier.SchemaChangeEvent ) error {
466+ return nil
467+ }
468+ n .RegisterHandler (notifier .TestHandlerID , testHandler )
469+ n .OnBecomeOwner ()
470+
471+ tk2 := testkit .NewTestKit (t , store )
472+ se := sess .NewSession (tk2 .Session ())
473+ ctx := context .Background ()
474+ event1 := notifier .NewCreateTableEvent (& model.TableInfo {ID : 1000 , Name : ast .NewCIStr ("t1" )})
475+ err := notifier .PubSchemeChangeToStore (ctx , se , 1 , - 1 , event1 , s )
476+ require .NoError (t , err )
477+
478+ // after handler processed the event, wait to ensure the record is deleted by DDL notifier
479+ require .Eventually (t , func () bool {
480+ changes := make ([]* notifier.SchemaChange , 8 )
481+ result , closeFn := s .List (ctx , se )
482+ count , err2 := result .Read (changes )
483+ require .NoError (t , err2 )
484+ closeFn ()
485+ return count == 0
486+ }, time .Second , 50 * time .Millisecond )
487+
488+ content , err := os .ReadFile (logFilename )
489+ require .NoError (t , err )
490+ require .NotContains (t , string (content ), "context provider not set" )
491+ }
0 commit comments