Skip to content

Commit

Permalink
fix: make concurrent retry tests run sequentially (#46)
Browse files Browse the repository at this point in the history
  • Loading branch information
Felix021 authored Jan 11, 2024
1 parent d09f7be commit 10d932d
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 69 deletions.
2 changes: 1 addition & 1 deletion run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ packages=(
for pkg in ${packages[@]}
do
if [ "$pkg" == "./thrift_streaming/..." ]; then
./thrift_streaming/generate.sh
LOCAL_REPO=$LOCAL_REPO ./thrift_streaming/generate.sh
fi
if [[ -n $LOCAL_REPO ]]; then
go test -covermode=atomic -coverprofile=${LOCAL_REPO}/coverage.txt.tmp -coverpkg=github.com/cloudwego/kitex/... $pkg
Expand Down
138 changes: 70 additions & 68 deletions thriftrpc/retrycall/retrycall_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,81 +200,83 @@ func TestNoCB(t *testing.T) {
}
}

func TestNoRetry(t *testing.T) {
atomic.StoreInt32(&testSTReqCount, -1)
// retry config
fp := retry.NewFailurePolicy()
fp.StopPolicy.CBPolicy.ErrorRate = 0.3
var opts []client.Option
opts = append(opts,
client.WithFailureRetry(fp),
client.WithRPCTimeout(20*time.Millisecond),
)
cli = getKitexClient(transport.Framed, opts...)

ctx, stReq := thriftrpc.CreateSTRequest(context.Background())
// add a mark to avoid retry
ctx = metainfo.WithPersistentValue(ctx, retry.TransitKey, strconv.Itoa(1))
ctx = metainfo.WithValue(ctx, skipCounterSleepKey, "1") // do not sleep by global variable
func TestRetry(t *testing.T) {
t.Run("TestNoRetry", func(t *testing.T) {
atomic.StoreInt32(&testSTReqCount, -1)
// retry config
fp := retry.NewFailurePolicy()
fp.StopPolicy.CBPolicy.ErrorRate = 0.3
var opts []client.Option
opts = append(opts,
client.WithFailureRetry(fp),
client.WithRPCTimeout(20*time.Millisecond),
)
cli = getKitexClient(transport.Framed, opts...)

for i := 0; i < 250; i++ {
reqCtx := ctx
if i%10 == 0 {
reqCtx = metainfo.WithValue(ctx, sleepTimeMsKey, "100")
ctx, stReq := thriftrpc.CreateSTRequest(context.Background())
// add a mark to avoid retry
ctx = metainfo.WithPersistentValue(ctx, retry.TransitKey, strconv.Itoa(1))
ctx = metainfo.WithValue(ctx, skipCounterSleepKey, "1") // do not sleep by global variable

for i := 0; i < 250; i++ {
reqCtx := ctx
if i%10 == 0 {
reqCtx = metainfo.WithValue(ctx, sleepTimeMsKey, "100")
}
stResp, err := cli.TestSTReq(reqCtx, stReq)
if i%10 == 0 {
test.Assert(t, err != nil)
test.Assert(t, strings.Contains(err.Error(), retryChainStopStr), err)
} else {
test.Assert(t, err == nil, err, i)
test.Assert(t, stReq.Str == stResp.Str)
}
}
stResp, err := cli.TestSTReq(reqCtx, stReq)
if i%10 == 0 {
test.Assert(t, err != nil)
test.Assert(t, strings.Contains(err.Error(), retryChainStopStr), err)
} else {
test.Assert(t, err == nil, err, i)
})

t.Run("TestBackupRequest", func(t *testing.T) {
atomic.StoreInt32(&testSTReqCount, -1)
// retry config
bp := retry.NewBackupPolicy(5)
var opts []client.Option
opts = append(opts,
client.WithBackupRequest(bp),
client.WithRPCTimeout(40*time.Millisecond),
)
cli = getKitexClient(transport.Framed, opts...)
for i := 0; i < 300; i++ {
ctx, stReq := thriftrpc.CreateSTRequest(context.Background())
stReq.Int64 = int64(i)
stResp, err := cli.TestSTReq(ctx, stReq)
test.Assert(t, err == nil, err, i, testSTReqCount)
test.Assert(t, stReq.Str == stResp.Str)
}
}
}

func TestBackupRequest(t *testing.T) {
atomic.StoreInt32(&testSTReqCount, -1)
// retry config
bp := retry.NewBackupPolicy(5)
var opts []client.Option
opts = append(opts,
client.WithBackupRequest(bp),
client.WithRPCTimeout(40*time.Millisecond),
)
cli = getKitexClient(transport.Framed, opts...)
for i := 0; i < 300; i++ {
ctx, stReq := thriftrpc.CreateSTRequest(context.Background())
stReq.Int64 = int64(i)
stResp, err := cli.TestSTReq(ctx, stReq)
test.Assert(t, err == nil, err, i, testSTReqCount)
test.Assert(t, stReq.Str == stResp.Str)
}
}
})

func TestServiceCB(t *testing.T) {
atomic.StoreInt32(&circuitBreakTestCount, -1)
// retry config
fp := retry.NewFailurePolicy()
var opts []client.Option
opts = append(opts, client.WithFailureRetry(fp))
opts = append(opts, client.WithRPCTimeout(50*time.Millisecond))
opts = append(opts, client.WithCircuitBreaker(circuitbreak.NewCBSuite(genCBKey)))
cli = getKitexClient(transport.TTHeader, opts...)
t.Run("TestServiceCB", func(t *testing.T) {
atomic.StoreInt32(&circuitBreakTestCount, -1)
// retry config
fp := retry.NewFailurePolicy()
var opts []client.Option
opts = append(opts, client.WithFailureRetry(fp))
opts = append(opts, client.WithRPCTimeout(50*time.Millisecond))
opts = append(opts, client.WithCircuitBreaker(circuitbreak.NewCBSuite(genCBKey)))
cli = getKitexClient(transport.TTHeader, opts...)

ctx, stReq := thriftrpc.CreateSTRequest(context.Background())
cbCount := 0
for i := 0; i < 300; i++ {
stResp, err := cli.CircuitBreakTest(ctx, stReq)
if err != nil {
test.Assert(t, strings.Contains(err.Error(), "retry circuit break") ||
strings.Contains(err.Error(), "service circuitbreak"), err, i)
cbCount++
} else {
test.Assert(t, stReq.Str == stResp.Str)
ctx, stReq := thriftrpc.CreateSTRequest(context.Background())
cbCount := 0
for i := 0; i < 300; i++ {
stResp, err := cli.CircuitBreakTest(ctx, stReq)
if err != nil {
test.Assert(t, strings.Contains(err.Error(), "retry circuit break") ||
strings.Contains(err.Error(), "service circuitbreak"), err, i)
cbCount++
} else {
test.Assert(t, stReq.Str == stResp.Str)
}
}
}
test.Assert(t, cbCount == 200, cbCount)
test.Assert(t, cbCount == 200, cbCount)
})
}

func TestRetryWithSpecifiedResultRetry(t *testing.T) {
Expand Down

0 comments on commit 10d932d

Please sign in to comment.