diff --git a/.github/workflows/push-check.yml b/.github/workflows/push-check.yml index 4f93cd4..c43224a 100644 --- a/.github/workflows/push-check.yml +++ b/.github/workflows/push-check.yml @@ -6,19 +6,11 @@ jobs: build: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 - + - uses: actions/checkout@v4 - name: Set up Go - uses: actions/setup-go@v2 - with: - go-version: "1.20" - - - uses: actions/cache@v2 + uses: actions/setup-go@v5 with: - path: ~/go/pkg/mod - key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} - restore-keys: | - ${{ runner.os }}-go- + go-version: stable - name: Check License Header uses: apache/skywalking-eyes/header@v0.4.0 diff --git a/kitexgrpc/compressor/grpc_compressor_test.go b/kitexgrpc/compressor/grpc_compressor_test.go index 96f1dd1..95623fa 100644 --- a/kitexgrpc/compressor/grpc_compressor_test.go +++ b/kitexgrpc/compressor/grpc_compressor_test.go @@ -23,6 +23,7 @@ import ( "github.com/cloudwego/kitex-tests/kitex_gen/protobuf/grpc_demo/servicea" "github.com/cloudwego/kitex-tests/pkg/test" + "github.com/cloudwego/kitex-tests/pkg/utils" client_opt "github.com/cloudwego/kitex/client" "github.com/cloudwego/kitex/client/callopt" "github.com/cloudwego/kitex/pkg/endpoint" @@ -42,9 +43,10 @@ func TestKitexWithoutCompressor(t *testing.T) { test.Assert(t, err == nil, err) }() defer svr.Stop() - time.Sleep(time.Second) + time.Sleep(50 * time.Millisecond) client, err := GetClient(hostport) test.Assert(t, err == nil, err) + defer utils.CallClose(client) resp, err := client.RunUnary() test.Assert(t, err == nil, err) test.Assert(t, resp != nil && resp.Message == "Kitex Hello!") @@ -68,9 +70,11 @@ func TestKitexCompressor(t *testing.T) { test.Assert(t, err == nil, err) }() defer svr.Stop() - time.Sleep(time.Second) + time.Sleep(50 * time.Millisecond) client, err := GetClient(hostport) test.Assert(t, err == nil, err) + defer utils.CallClose(client) + resp, err := client.RunUnary(callopt.WithGRPCCompressor(kitex_gzip.Name)) test.Assert(t, err == nil, err) test.Assert(t, resp != nil && resp.Message == "Kitex Hello!") @@ -94,13 +98,15 @@ func TestKitexCompressorWithGRPCClient(t *testing.T) { test.Assert(t, err == nil, err) }() defer svr.Stop() - time.Sleep(time.Second) + time.Sleep(50 * time.Millisecond) conn, err := grpc.Dial(hostport, grpc.WithInsecure(), grpc.WithBlock()) test.Assert(t, err == nil, err) defer conn.Close() client, err := GetGRPCClient(hostport) test.Assert(t, err == nil, err) + defer utils.CallClose(client) + resp, err := client.RunUnary(grpc.UseCompressor(gzip.Name)) test.Assert(t, err == nil, err) test.Assert(t, resp != nil && resp.Message == "Grpc Hello!") @@ -135,10 +141,12 @@ func TestKitexCompressorWithGRPCServer(t *testing.T) { err := RunGRPCServer(hostport) test.Assert(t, err == nil, err) }() - time.Sleep(time.Second) + time.Sleep(50 * time.Millisecond) client, err := GetClient(hostport, client_opt.WithMiddleware(ServiceNameMW)) test.Assert(t, err == nil, err) + defer utils.CallClose(client) + resp, err := client.RunUnary(callopt.WithGRPCCompressor(kitex_gzip.Name)) test.Assert(t, err == nil, err) test.Assert(t, resp != nil && resp.Message == "Kitex Hello!") diff --git a/kitexgrpc/compressor/kitex_client.go b/kitexgrpc/compressor/kitex_client.go index 78476e5..40ee159 100644 --- a/kitexgrpc/compressor/kitex_client.go +++ b/kitexgrpc/compressor/kitex_client.go @@ -16,13 +16,14 @@ package compressor import ( "context" + "io" + "strconv" + "github.com/cloudwego/kitex-tests/kitex_gen/protobuf/grpc_demo" "github.com/cloudwego/kitex-tests/kitex_gen/protobuf/grpc_demo/servicea" "github.com/cloudwego/kitex/client" "github.com/cloudwego/kitex/client/callopt" "github.com/cloudwego/kitex/transport" - "io" - "strconv" ) type ClientWrapper struct { @@ -45,7 +46,6 @@ func (c *ClientWrapper) RunUnary(callOptions ...callopt.Option) (*grpc_demo.Repl } func (c *ClientWrapper) RunClientStream(callOptions ...callopt.Option) (*grpc_demo.Reply, error) { - ctx := context.Background() streamCli, err := c.client.CallClientStream(ctx) if err != nil { diff --git a/kitexgrpc/multi_service/multi_service_test.go b/kitexgrpc/multi_service/multi_service_test.go index a1bfae5..e212948 100644 --- a/kitexgrpc/multi_service/multi_service_test.go +++ b/kitexgrpc/multi_service/multi_service_test.go @@ -31,6 +31,7 @@ import ( "github.com/cloudwego/kitex-tests/kitex_gen/protobuf/grpc_multi_service_2" servicea2 "github.com/cloudwego/kitex-tests/kitex_gen/protobuf/grpc_multi_service_2/servicea" "github.com/cloudwego/kitex-tests/pkg/test" + "github.com/cloudwego/kitex-tests/pkg/utils" ) type ServiceAImpl struct{} @@ -93,6 +94,7 @@ func TestMultiService(t *testing.T) { clientA, err := servicea.NewClient("ServiceA", client.WithTransportProtocol(transport.GRPC), client.WithHostPorts(hostport)) test.Assert(t, err == nil, err) + defer utils.CallClose(clientA) streamCliA, err := clientA.EchoA(context.Background()) test.Assert(t, err == nil, err) @@ -100,15 +102,19 @@ func TestMultiService(t *testing.T) { respA, err := streamCliA.Recv() test.Assert(t, err == nil) test.Assert(t, respA.Message == "ServiceA") + streamCliA.Close() clientB, err := serviceb.NewClient("ServiceB", client.WithTransportProtocol(transport.GRPC), client.WithHostPorts(hostport)) test.Assert(t, err == nil, err) + defer utils.CallClose(clientB) + streamCliB, err := clientB.EchoB(context.Background()) test.Assert(t, err == nil, err) streamCliB.Send(&grpc_multi_service.RequestB{Name: "ServiceB"}) respB, err := streamCliB.Recv() test.Assert(t, err == nil) test.Assert(t, respB.Message == "ServiceB") + streamCliB.Close() } func TestUnknownException(t *testing.T) { @@ -121,6 +127,8 @@ func TestUnknownException(t *testing.T) { clientC, err := servicec.NewClient("ServiceC", client.WithTransportProtocol(transport.GRPC), client.WithHostPorts(hostport)) test.Assert(t, err == nil, err) + defer utils.CallClose(clientC) + streamCliC, err := clientC.EchoC(context.Background()) test.Assert(t, err == nil, err) streamCliC.Send(&grpc_multi_service.RequestC{Name: "ServiceC"}) @@ -138,6 +146,8 @@ func TestUnknownExceptionWithMultiService(t *testing.T) { // unknown service error clientC, err := servicec.NewClient("ServiceC", client.WithTransportProtocol(transport.GRPC), client.WithHostPorts(hostport)) test.Assert(t, err == nil, err) + defer utils.CallClose(clientC) + streamCliC, err := clientC.EchoC(context.Background()) test.Assert(t, err == nil, err) streamCliC.Send(&grpc_multi_service.RequestC{Name: "ServiceC"}) @@ -148,6 +158,8 @@ func TestUnknownExceptionWithMultiService(t *testing.T) { // unknown method error clientA, err := servicea2.NewClient("ServiceA", client.WithTransportProtocol(transport.GRPC), client.WithHostPorts(hostport)) test.Assert(t, err == nil, err) + defer utils.CallClose(clientA) + streamCliA, err := clientA.Echo(context.Background()) test.Assert(t, err == nil, err) streamCliA.Send(&grpc_multi_service_2.Request{Name: "ServiceA"}) diff --git a/kitexgrpc/unknown_handler/unknown_service_handler_test.go b/kitexgrpc/unknown_handler/unknown_service_handler_test.go index d6d480f..2b431af 100644 --- a/kitexgrpc/unknown_handler/unknown_service_handler_test.go +++ b/kitexgrpc/unknown_handler/unknown_service_handler_test.go @@ -30,6 +30,7 @@ import ( "github.com/cloudwego/kitex-tests/kitex_gen/protobuf/unknown_handler/servicea" "github.com/cloudwego/kitex-tests/kitex_gen/protobuf/unknown_handler/serviceb" "github.com/cloudwego/kitex-tests/pkg/test" + "github.com/cloudwego/kitex-tests/pkg/utils" ) type ServiceAImpl struct{} @@ -65,9 +66,10 @@ func TestUnknownServiceError(t *testing.T) { go svr.Run() defer svr.Stop() - time.Sleep(time.Second) + time.Sleep(50 * time.Millisecond) client, err := servicea.NewClient("grpcService", client.WithTransportProtocol(transport.GRPC), client.WithHostPorts(ip)) test.Assert(t, err == nil, err) + defer utils.CallClose(client) req := &unknown_handler.Request{Name: "kitex"} _, err = client.Echo(context.Background(), req) @@ -82,9 +84,10 @@ func TestUnknownServiceHandler(t *testing.T) { go svr.Run() defer svr.Stop() - time.Sleep(time.Second) + time.Sleep(50 * time.Millisecond) client, err := servicea.NewClient("grpcService", client.WithTransportProtocol(transport.GRPC), client.WithHostPorts(ip)) test.Assert(t, err == nil, err) + defer utils.CallClose(client) req := &unknown_handler.Request{Name: "kitex"} resp, err := client.Echo(context.Background(), req) diff --git a/pkg/utils/client.go b/pkg/utils/client.go new file mode 100644 index 0000000..c8b70d8 --- /dev/null +++ b/pkg/utils/client.go @@ -0,0 +1,94 @@ +/* + * Copyright 2024 CloudWeGo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package utils + +import ( + "io" + "reflect" + "unsafe" +) + +const maxdepath = 10 + +// CallClose calls underlying Close method which provided by kitex/client. +// XXX: kitex tool doesn't generate Close method for client, and we need it for testing +// or Server.Stop will block till timeout +func CallClose(v interface{}) bool { + // likely ok=false, or we should call Close directly + if f, ok := v.(io.Closer); ok { + f.Close() + return true + } + rv := reflect.ValueOf(v) + p := rv.UnsafePointer() + ret := callClose(rv, p, maxdepath) + return ret +} + +// callClose returns true if it can stop +func callClose(rv reflect.Value, p unsafe.Pointer, depth int) bool { + if depth == 0 { + return false + } + if rv.Kind() == reflect.Pointer { + if tryCallClose(rv, p) { + return true + } + rv = rv.Elem() + } + if rv.Kind() != reflect.Interface && rv.Kind() != reflect.Struct { + return false + } + if rv.Kind() == reflect.Interface { + rv = rv.Elem() + p = unsafe.Add(p, unsafe.Sizeof(uintptr(0))) + p = *(*unsafe.Pointer)(p) // get data pointer of eface + if tryCallClose(rv, p) { + return true + } + if rv.Kind() == reflect.Pointer { + rv = rv.Elem() + } + if rv.Kind() != reflect.Struct { + panic("unknown type: " + rv.Type().String()) + } + } + rv = rv.Field(0) // for generated code, always 1 field + if rv.Kind() == reflect.Pointer { + p = *(*unsafe.Pointer)(p) + } + return callClose(rv, p, depth-1) +} + +func tryCallClose(rv reflect.Value, p unsafe.Pointer) bool { + // check pointer to struct + if rv.Kind() != reflect.Pointer { + return false + } + if rv.Elem().Kind() != reflect.Struct { + return false + } + + // use reflect.NewAt to bypass unexported field issue + v := reflect.NewAt(rv.Type().Elem(), p) + f, ok := v.Interface().(io.Closer) + if ok { + f.Close() + return true + } + return false +} diff --git a/pkg/utils/client_test.go b/pkg/utils/client_test.go new file mode 100644 index 0000000..c9d6db1 --- /dev/null +++ b/pkg/utils/client_test.go @@ -0,0 +1,53 @@ +/* + * Copyright 2024 CloudWeGo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package utils + +import ( + "testing" + "unsafe" +) + +type clientWithClose struct { + closed bool +} + +func (c *clientWithClose) Close() error { + c.closed = true + return nil +} + +func TestCallClose(t *testing.T) { + cli := &clientWithClose{} + t.Log("*clientWithClose", unsafe.Pointer(cli)) + + type TestClient0 struct { + *clientWithClose + } + c0 := TestClient0{cli} + type TestClient1 struct { + TestClient0 + } + c1 := TestClient1{c0} + type TestClient2 struct { + cli TestClient1 + } + c2 := &TestClient2{c1} + CallClose(c2) + if !cli.closed { + t.Fatal("not close") + } +} diff --git a/thrift_streaming/generate.sh b/thrift_streaming/generate.sh index efea863..8a37d64 100755 --- a/thrift_streaming/generate.sh +++ b/thrift_streaming/generate.sh @@ -151,6 +151,7 @@ cd exitserver cp go.mod go.mod.bak cp go.sum go.sum.bak go get -u github.com/bytedance/sonic +go get -u github.com/choleraehyq/pid go build # recover the change above, make sure git diff is clear mv go.mod.bak go.mod