Skip to content

grpc_test: add tests for cardinality violation #8120

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Apr 25, 2025
85 changes: 85 additions & 0 deletions test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ var (

var raceMode bool // set by race.go in race mode

// Note : Do not use this for further tests.
type testServer struct {
testgrpc.UnimplementedTestServiceServer

Expand Down Expand Up @@ -3585,6 +3586,90 @@ func testClientStreamingError(t *testing.T, e env) {
}
}

func (s) TestClientStreamingMissingSendAndCloseError(t *testing.T) {
// TODO : https://github.com/grpc/grpc-go/issues/8119 - remove `t.Skip()`
// after this is fixed.
t.Skip()
testClientStreamingMissingSendAndCloseError(t)
}

func testClientStreamingMissingSendAndCloseError(t *testing.T) {
ss := &stubserver.StubServer{
StreamingInputCallF: func(_ testgrpc.TestService_StreamingInputCallServer) error {
return nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be comments here and there explaining why the test is doing what it is. Like "// Immediately return a success without first sending a response message, which is illegal."

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

},
}
if err := ss.Start(nil); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

stream, err := ss.Client.StreamingInputCall(ctx)
if err != nil {
t.Fatalf(".StreamingInputCall(_) = _, %v, want <nil>", err)
}

if _, err := stream.CloseAndRecv(); status.Code(err) != codes.Internal {
t.Fatalf("%v.CloseAndRecv() = %v, want error %s", stream, err, codes.Internal)
}
}

func (s) TestClientStreamingRecvAfterCloseError(t *testing.T) {
// TODO : https://github.com/grpc/grpc-go/issues/8119 - remove `t.Skip()`
// after this is fixed.
t.Skip()
testClientStreamingRecvAfterCloseError(t)
}

func testClientStreamingRecvAfterCloseError(t *testing.T) {
ss := &stubserver.StubServer{
StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error {
sum := 0
in, _ := stream.Recv()
p := in.GetPayload().GetBody()
sum += len(p)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sum := len(p) ? But why is it a sum? Probably this was left over from an old iteration. You can just delete it and return len(p) directly in the response message, too. Or, better, just return 0 in the response. The value is unimportant.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

stream.SendAndClose(&testpb.StreamingInputCallResponse{
AggregatedPayloadSize: int32(sum),
})
_, err := stream.Recv()
return err
},
}
if err := ss.Start(nil); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

stream, err := ss.Client.StreamingInputCall(ctx)
if err != nil {
t.Fatalf("StreamingInputCall(_) = _, %v, want <nil>", err)
}
payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 1)
if err != nil {
t.Fatal(err)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems easier to just delete this. Send empty requests instead...

Less code is usually better code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


req := &testpb.StreamingInputCallRequest{
Payload: payload,
}

for {
if err = stream.Send(req); err == nil {
continue
}
if _, err := stream.CloseAndRecv(); status.Code(err) != codes.Internal {
t.Fatalf("%v.CloseAndRecv() = %v, want error %s", stream, err, codes.Internal)
}
break
}
}

func (s) TestExceedMaxStreamsLimit(t *testing.T) {
for _, e := range listTestEnv() {
testExceedMaxStreamsLimit(t, e)
Expand Down