Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: unified runner config and hid gopool #380

Merged
merged 1 commit into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 2 additions & 15 deletions connection_onevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,9 @@ import (
"context"
"sync/atomic"

"github.com/bytedance/gopkg/util/gopool"
"github.com/cloudwego/netpoll/internal/runner"
)

var runTask = gopool.CtxGo

func setRunner(runner func(ctx context.Context, f func())) {
runTask = runner
}

func disableGopool() error {
runTask = func(ctx context.Context, f func()) {
go f()
}
return nil
}

// ------------------------------------ implement OnPrepare, OnRequest, CloseCallback ------------------------------------

type gracefulExit interface {
Expand Down Expand Up @@ -272,7 +259,7 @@ func (c *connection) onProcess(onConnect OnConnect, onRequest OnRequest) (proces
} // end of task closure func

// add new task
runTask(c.ctx, task)
runner.RunTask(c.ctx, task)
return true
}

Expand Down
45 changes: 45 additions & 0 deletions internal/runner/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2025 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package runner

import (
"context"

"github.com/bytedance/gopkg/util/gopool"
)

// RunTask runs the `f` in background, and `ctx` is optional.
// `ctx` is used to pass to underlying implementation
var RunTask func(ctx context.Context, f func())

func goRunTask(ctx context.Context, f func()) {
go f()
}

func init() {
// TODO(xiaost): Disable gopool by default in the future.
// Once we move to use gopool of cloudwego/gopkg in other repos,
// there should be no reason to continue using bytedance/gopkg version,
// and for most users, using the 'go' keyword directly is more suitable.
RunTask = gopool.CtxGo
}

// UseGoRunTask updates RunTask with goRunTask which creates
// a new goroutine for the given func, basically `go f()`
func UseGoRunTask() {
RunTask = goRunTask
}
37 changes: 37 additions & 0 deletions internal/runner/runner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2025 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package runner

import (
"context"
"sync"
"testing"
)

func TestRunTask(t *testing.T) {
var wg sync.WaitGroup
wg.Add(2)
ctx := context.Background()
RunTask(ctx, func() {
wg.Done()
})
UseGoRunTask()
RunTask(ctx, func() {
wg.Done()
})
wg.Wait()
}
5 changes: 2 additions & 3 deletions mux/shard_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ import (
"sync"
"sync/atomic"

"github.com/bytedance/gopkg/util/gopool"

"github.com/cloudwego/netpoll"
"github.com/cloudwego/netpoll/internal/runner"
)

/* DOC:
Expand Down Expand Up @@ -137,7 +136,7 @@ func (q *ShardQueue) foreach() {
if atomic.AddInt32(&q.runNum, 1) > 1 {
return
}
gopool.CtxGo(nil, func() {
runner.RunTask(nil, func() {
var negNum int32 // is negative number of triggerNum
for triggerNum := atomic.LoadInt32(&q.trigger); triggerNum > 0; {
q.r = (q.r + 1) % q.size
Expand Down
15 changes: 10 additions & 5 deletions netpoll_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"os"
"runtime"
"sync"

"github.com/cloudwego/netpoll/internal/runner"
)

var (
Expand Down Expand Up @@ -52,7 +54,7 @@ func Configure(config Config) (err error) {
}

if config.Runner != nil {
setRunner(config.Runner)
runner.RunTask = config.Runner
}
if config.LoggerOutput != nil {
logger = log.New(config.LoggerOutput, "", log.LstdFlags)
Expand Down Expand Up @@ -99,19 +101,22 @@ func SetLoggerOutput(w io.Writer) {
}

// SetRunner set the runner function for every OnRequest/OnConnect callback
// Deprecated: use Configure instead.
//
// Deprecated: use Configure and specify config.Runner instead.
func SetRunner(f func(ctx context.Context, f func())) {
setRunner(f)
runner.RunTask = f
}

// DisableGopool will remove gopool(the goroutine pool used to run OnRequest),
// which means that OnRequest will be run via `go OnRequest(...)`.
// Usually, OnRequest will cause stack expansion, which can be solved by reusing goroutine.
// But if you can confirm that the OnRequest will not cause stack expansion,
// it is recommended to use DisableGopool to reduce redundancy and improve performance.
// Deprecated: use Configure instead.
//
// Deprecated: use Configure() and specify config.Runner instead.
func DisableGopool() error {
return disableGopool()
runner.UseGoRunTask()
return nil
}

// NewEventLoop .
Expand Down
Loading