Skip to content

Commit

Permalink
fix: wait wg for unary invocation
Browse files Browse the repository at this point in the history
  • Loading branch information
DMwangnima committed Sep 20, 2024
1 parent 9edb312 commit e347060
Showing 1 changed file with 25 additions and 15 deletions.
40 changes: 25 additions & 15 deletions thrift_streaming/thrift_tracing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ import (
"time"

"github.com/bytedance/gopkg/cloud/metainfo"
"github.com/cloudwego/kitex-tests/common"
"github.com/cloudwego/kitex-tests/pkg/test"
"github.com/cloudwego/kitex-tests/thrift_streaming/kitex_gen/echo"
"github.com/cloudwego/kitex-tests/thrift_streaming/kitex_gen/echo/echoservice"
"github.com/cloudwego/kitex/client/streamclient"
"github.com/cloudwego/kitex/pkg/endpoint"
"github.com/cloudwego/kitex/pkg/kerrors"
Expand All @@ -39,6 +35,11 @@ import (
"github.com/cloudwego/kitex/pkg/stats"
"github.com/cloudwego/kitex/pkg/streaming"
"github.com/cloudwego/kitex/server"

"github.com/cloudwego/kitex-tests/common"
"github.com/cloudwego/kitex-tests/pkg/test"
"github.com/cloudwego/kitex-tests/thrift_streaming/kitex_gen/echo"
"github.com/cloudwego/kitex-tests/thrift_streaming/kitex_gen/echo/echoservice"
)

/*
Expand Down Expand Up @@ -113,13 +114,19 @@ func (t *testTracer) Finish(ctx context.Context) {
return
}

// finishCheck should be called for non-unary
func (tr *testTracer) finishCheck(t *testing.T, info string) {
tr.wg.Wait()
test.Assert(t, tr.finishCalled, tr)
tr.finishCalledCheck(t)
test.Assert(t, tr.sendSize == tr.finishSendSize, tr.sendSize, tr.finishSendSize, info)
test.Assert(t, tr.recvSize == tr.finishRecvSize, tr.recvSize, tr.finishRecvSize, info)
}

// finishCalledCheck should be invoked for unary
func (tr *testTracer) finishCalledCheck(t *testing.T) {
tr.wg.Wait()
test.Assert(t, tr.finishCalled, tr)
}

var _ streaming.Stream = (*wrapStream)(nil)

type wrapStream struct {
Expand Down Expand Up @@ -205,6 +212,8 @@ func TestTracerNormalEndOfStream(t *testing.T) {
// save for server
test.Assert(t, serverTracer.recvSize == 0 && serverTracer.sendSize == 0, serverTracer)
test.Assert(t, serverTracer.finishSendSize > 0 && serverTracer.finishRecvSize > 0, serverTracer)
clientTracer.finishCalledCheck(t)
serverTracer.finishCalledCheck(t)
})

t.Run("bidirectional api", func(t *testing.T) {
Expand Down Expand Up @@ -330,7 +339,8 @@ func TestTracerNormalEndOfStream(t *testing.T) {
test.Assert(t, clientTracer.sendCount == count, clientTracer)
test.Assert(t, clientTracer.recvCount == 1, clientTracer)
// regardless of whether wrapped stream implements WithDoFinish, it's done within client.stream.RecvMsg
test.Assert(t, clientTracer.finishCalled, clientTracer)
clientTracer.finishCheck(t, "client")
serverTracer.finishCheck(t, "server")
})

t.Run("client streaming with wrapped stream with DoFinish", func(t *testing.T) {
Expand Down Expand Up @@ -365,8 +375,8 @@ func TestTracerNormalEndOfStream(t *testing.T) {
test.Assert(t, err == nil, err)
test.Assert(t, clientTracer.sendCount == count, clientTracer)
test.Assert(t, clientTracer.recvCount == 1, clientTracer)
// wrapped stream doesn't implement WithDoFinish, so the DoFinish won't be called
test.Assert(t, clientTracer.finishCalled, clientTracer)
clientTracer.finishCheck(t, "client")
serverTracer.finishCheck(t, "server")
})
}

Expand Down Expand Up @@ -460,8 +470,8 @@ func TestTracingServerReturnError(t *testing.T) {
test.Assert(t, clientTracer.recvCount == 0, clientTracer)
test.Assert(t, serverTracer.sendCount == 0, serverTracer)
test.Assert(t, serverTracer.recvCount == 0, serverTracer)
test.Assert(t, clientTracer.finishCalled, clientTracer)
test.Assert(t, serverTracer.finishCalled, serverTracer)
clientTracer.finishCalledCheck(t)
serverTracer.finishCalledCheck(t)
})

t.Run("server", func(t *testing.T) {
Expand Down Expand Up @@ -545,8 +555,8 @@ func TestTracingServerReturnBizError(t *testing.T) {
test.Assert(t, clientTracer.recvCount == 0, clientTracer)
test.Assert(t, serverTracer.sendCount == 0, serverTracer)
test.Assert(t, serverTracer.recvCount == 0, serverTracer)
test.Assert(t, clientTracer.finishCalled, clientTracer)
test.Assert(t, serverTracer.finishCalled, serverTracer)
clientTracer.finishCalledCheck(t)
serverTracer.finishCalledCheck(t)
})

// Waiting for fix for streaming apis with biz error
Expand Down Expand Up @@ -635,7 +645,7 @@ func TestTracingClientTimeout(t *testing.T) {
// recv/send event is not reported to tracer for unary api
test.Assert(t, clientTracer.sendCount == 0, clientTracer)
test.Assert(t, clientTracer.recvCount == 0, clientTracer)
test.Assert(t, clientTracer.finishCalled, clientTracer)
clientTracer.finishCalledCheck(t)
})

t.Run("server", func(t *testing.T) {
Expand Down Expand Up @@ -702,7 +712,7 @@ func TestTracingServerStop(t *testing.T) {
// recv/send event is not reported to tracer for unary api
test.Assert(t, clientTracer.sendCount == 0, clientTracer)
test.Assert(t, clientTracer.recvCount == 0, clientTracer)
test.Assert(t, clientTracer.finishCalled, clientTracer)
clientTracer.finishCalledCheck(t)
})

t.Run("server", func(t *testing.T) {
Expand Down

0 comments on commit e347060

Please sign in to comment.