Skip to content

Commit

Permalink
fix: fix connection notify (#3335)
Browse files Browse the repository at this point in the history
Signed-off-by: Song Gao <[email protected]>
  • Loading branch information
Yisaer authored Oct 25, 2024
1 parent 1414313 commit 9463f7d
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 20 deletions.
27 changes: 10 additions & 17 deletions pkg/connection/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@
package connection

import (
rawContext "context"
"sync"
"sync/atomic"

"github.com/lf-edge/ekuiper/contract/v2/api"

"github.com/lf-edge/ekuiper/v2/internal/conf"
"github.com/lf-edge/ekuiper/v2/internal/topo/context"
"github.com/lf-edge/ekuiper/v2/pkg/modules"
)
Expand All @@ -32,10 +30,8 @@ type ConnWrapper struct {
conn modules.Connection
err error
l sync.RWMutex
// context for connection wait only
waitCtx rawContext.Context
// context for the lifecycle
stop rawContext.CancelFunc
readCh chan struct{}
detachCh chan struct{}
}

func (cw *ConnWrapper) setConn(conn modules.Connection, err error) {
Expand All @@ -50,7 +46,8 @@ func (cw *ConnWrapper) Wait(connectorCtx api.StreamContext) (modules.Connection,
select {
case <-connectorCtx.Done():
connectorCtx.GetLogger().Infof("stop waiting connection")
case <-cw.waitCtx.Done():
case <-cw.readCh:
case <-cw.detachCh:
}
cw.l.RLock()
defer cw.l.RUnlock()
Expand All @@ -63,20 +60,16 @@ func (cw *ConnWrapper) IsInitialized() bool {
return cw.initialized
}

func newConnWrapper(callerCtx api.StreamContext, meta *Meta) *ConnWrapper {
callerCtx.GetLogger().Infof("creating new connection wrapper")
wctx, onConnect := rawContext.WithCancel(rawContext.Background())
contextLogger := conf.Log.WithField("conn", meta.ID)
connCtx, stop := context.WithValue(context.Background(), context.LoggerKey, contextLogger).WithCancel()
func newConnWrapper(ctx api.StreamContext, meta *Meta) *ConnWrapper {
cw := &ConnWrapper{
ID: meta.ID,
waitCtx: wctx,
stop: stop,
ID: meta.ID,
readCh: make(chan struct{}),
detachCh: make(chan struct{}),
}
go func() {
conn, err := createConnection(connCtx, meta)
conn, err := createConnection(ctx, meta)
cw.setConn(conn, err)
onConnect()
close(cw.readCh)
}()
return cw
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/connection/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,7 @@ func detachConnection(ctx api.StreamContext, conId string) error {
globalConnectionManager.connectionPool[conId] = meta
conf.Log.Infof("detachConnection remove conn:%v,ref:%v", conId, refId)
if !meta.Named && meta.GetRefCount() == 0 {
if meta.cw.stop != nil {
meta.cw.stop()
}
close(meta.cw.detachCh)
conn, err := meta.cw.Wait(ctx)
if conn != nil && err == nil {
conn.Close(ctx)
Expand Down

0 comments on commit 9463f7d

Please sign in to comment.