Skip to content

Commit

Permalink
test: gRPC short connection should be closed after call (#68)
Browse files Browse the repository at this point in the history
  • Loading branch information
ppzqh authored Apr 20, 2024
1 parent 2c0d166 commit beb6c00
Showing 1 changed file with 82 additions and 3 deletions.
85 changes: 82 additions & 3 deletions kitexgrpc/normalcall/normalcall_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,24 @@ package normalcall

import (
"context"
"io"
"net"
"os"
"strconv"
"strings"
"testing"
"time"

"github.com/cloudwego/kitex-tests/kitex_gen/protobuf/grpc_demo"
"github.com/cloudwego/kitex-tests/kitex_gen/protobuf/grpc_demo/servicea"
"github.com/cloudwego/kitex-tests/pkg/test"
"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/pkg/endpoint"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/server"
netutil "github.com/shirou/gopsutil/v3/net"

"github.com/cloudwego/kitex-tests/kitex_gen/protobuf/grpc_demo"
"github.com/cloudwego/kitex-tests/kitex_gen/protobuf/grpc_demo/servicea"
"github.com/cloudwego/kitex-tests/pkg/test"
)

type ServerAHandler struct {
Expand All @@ -37,6 +44,21 @@ func (h *ServerAHandler) CallUnary(ctx context.Context, req *grpc_demo.Request)
return &grpc_demo.Reply{Message: req.Name}, nil
}

func (h *ServerAHandler) CallClientStream(stream grpc_demo.ServiceA_CallClientStreamServer) (err error) {
var msgs []string
for {
req, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
return err
}
msgs = append(msgs, req.Name)
}
return stream.SendAndClose(&grpc_demo.Reply{Message: "all message: " + strings.Join(msgs, ", ")})
}

func serverAddr(hostPort string) *net.TCPAddr {
addr, err := net.ResolveTCPAddr("tcp", hostPort)
if err != nil {
Expand Down Expand Up @@ -85,3 +107,60 @@ func TestDisableRPCInfoReuse(t *testing.T) {
test.Assert(t, ri.Invocation().MethodName() != "", ri.Invocation().MethodName())
})
}

func TestShortConnection(t *testing.T) {
svrIP, svrPort := "127.0.0.1", "19005"
svrAddrStr := svrIP + ":" + svrPort
svr := servicea.NewServer(
&ServerAHandler{},
server.WithServiceAddr(serverAddr(svrAddrStr)),
)
go func() {
if err := svr.Run(); err != nil {
panic(err)
}
}()
time.Sleep(time.Second)
defer svr.Stop()

cli, err := servicea.NewClient("servicea",
client.WithHostPorts(svrAddrStr),
client.WithShortConnection(),
)
test.Assert(t, err == nil)

clientStream, err := cli.CallClientStream(context.Background())
test.Assert(t, err == nil)
err = clientStream.Send(&grpc_demo.Request{Name: "1"})
test.Assert(t, err == nil)
_, err = clientStream.CloseAndRecv()
test.Assert(t, err == nil)
time.Sleep(time.Second)

// the connection should not exist after the call
exist, err := checkEstablishedConnection(svrIP, svrPort)
if err != nil {
klog.Warnf("check established connection failed, error=%v", err)
} else {
test.Assert(t, err == nil)
test.Assert(t, exist == false)
}
}

// return true if current pid established connection with the input remote ip and port
func checkEstablishedConnection(ip, port string) (bool, error) {
pid := os.Getpid()
conns, err := netutil.ConnectionsPid("all", int32(pid))
if err != nil {
return false, err
}

for _, c := range conns {
if c.Status == "ESTABLISHED" {
if strconv.Itoa(int(c.Raddr.Port)) == port && c.Raddr.IP == ip {
return true, nil
}
}
}
return false, nil
}

0 comments on commit beb6c00

Please sign in to comment.