Skip to content

Commit

Permalink
feat: implement skipDecoder to enable Frugal and FastCodec for standa…
Browse files Browse the repository at this point in the history
…rd Thrift Buffer Protocol (#67)
  • Loading branch information
DMwangnima authored Apr 18, 2024
1 parent 341cc3a commit 2c0d166
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 9 deletions.
3 changes: 3 additions & 0 deletions run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ kitex -module github.com/cloudwego/kitex-tests -I idl ./idl/multi_service_2.prot
test -d kitex_gen_slim && rm -rf kitex_gen_slim
kitex -module github.com/cloudwego/kitex-tests -thrift template=slim -gen-path kitex_gen_slim ./idl/stability.thrift

test -d kitex_gen_noDefSerdes && rm -rf kitex_gen_noDefSerdes
kitex -module github.com/cloudwego/kitex-tests -thrift no_default_serdes -gen-path kitex_gen_noDefSerdes ./idl/stability.thrift

# generate thrift streaming code
LOCAL_REPO=$LOCAL_REPO ./thrift_streaming/generate.sh
test -d grpc_gen && rm -rf grpc_gen
Expand Down
42 changes: 42 additions & 0 deletions thriftrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"runtime"
"time"

stability_noDefSerdes "github.com/cloudwego/kitex-tests/kitex_gen_noDefSerdes/thrift/stability"
stservice_noDefSerdes "github.com/cloudwego/kitex-tests/kitex_gen_noDefSerdes/thrift/stability/stservice"

"github.com/apache/thrift/lib/go/thrift"
"github.com/bytedance/gopkg/cloud/metainfo"
"github.com/cloudwego/kitex-tests/kitex_gen/thrift/instparam"
Expand Down Expand Up @@ -73,6 +76,13 @@ func CreateSlimKitexClient(param *ClientInitParam, opts ...client.Option) stserv
return stservice_slim.MustNewClient(param.TargetServiceName, opts...)
}

// CreateNoDefSerdesKitexClient .
func CreateNoDefSerdesKitexClient(param *ClientInitParam, opts ...client.Option) stservice_noDefSerdes.Client {
opts = generateClientOptionsFromParam(param, opts...)

return stservice_noDefSerdes.MustNewClient(param.TargetServiceName, opts...)
}

// generateClientOptionsFromParam process ClientInitParam and add client.Option
func generateClientOptionsFromParam(param *ClientInitParam, opts ...client.Option) []client.Option {
if len(param.HostPorts) > 0 {
Expand Down Expand Up @@ -165,6 +175,38 @@ func CreateSlimSTRequest(ctx context.Context) (context.Context, *stability_slim.
return ctx, req
}

// CreateNoDefSerdesSTRequest .
func CreateNoDefSerdesSTRequest(ctx context.Context) (context.Context, *stability_noDefSerdes.STRequest) {
req := stability_noDefSerdes.NewSTRequest()
req.Name = "byted"
req.On = thrift.BoolPtr(true)
req.B = 10
req.Int16 = 10
req.Int32 = math.MaxInt32
req.Int64 = math.MaxInt64
req.D = 0.0
req.Str = utils.RandomString(100)
req.Bin = []byte{1, 'a', '*'}
req.StringMap = map[string]string{
"key1": utils.RandomString(100),
"key2": utils.RandomString(10),
}
req.StringList = []string{
utils.RandomString(10),
utils.RandomString(20),
utils.RandomString(30),
}
req.StringSet = []string{
utils.RandomString(10),
utils.RandomString(100),
}
req.E = stability_noDefSerdes.TestEnum_FIRST

ctx = metainfo.WithValue(ctx, "TK", "TV")
ctx = metainfo.WithPersistentValue(ctx, "PK", "PV")
return ctx, req
}

// CreateObjReq .
func CreateObjReq(ctx context.Context) (context.Context, *instparam.ObjReq) {
id := thrift.Int64Ptr(int64(rand.Intn(100)))
Expand Down
93 changes: 84 additions & 9 deletions thriftrpc/normalcall/normalcall_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,25 @@ import (
"github.com/cloudwego/kitex-tests/common"
"github.com/cloudwego/kitex-tests/kitex_gen/thrift/stability"
"github.com/cloudwego/kitex-tests/kitex_gen/thrift/stability/stservice"
stservice_noDefSerdes "github.com/cloudwego/kitex-tests/kitex_gen_noDefSerdes/thrift/stability/stservice"
stservice_slim "github.com/cloudwego/kitex-tests/kitex_gen_slim/thrift/stability/stservice"
"github.com/cloudwego/kitex-tests/pkg/test"
"github.com/cloudwego/kitex-tests/thriftrpc"
)

var (
cli stservice.Client
cliSlim stservice_slim.Client

addr = "127.0.0.1:9001"
disablePoolAddr = "127.0.0.1:9002"
muxAdr = "127.0.0.1:9003" // used in `muxcall`
slimFrugalAddr = "127.0.0.1:9004"
slimAddr = "127.0.0.1:9005"
serverTimeoutAddr = "127.0.0.1:9006"
cli stservice.Client
cliSlim stservice_slim.Client
cliNoDefSerdes stservice_noDefSerdes.Client

addr = "127.0.0.1:9001"
disablePoolAddr = "127.0.0.1:9002"
muxAdr = "127.0.0.1:9003" // used in `muxcall`
slimFrugalAddr = "127.0.0.1:9004"
slimAddr = "127.0.0.1:9005"
serverTimeoutAddr = "127.0.0.1:9006"
noDefSerdesFrugalAddr = "127.0.0.1:9007"
noDefSerdesFastCodecAddr = "127.0.0.1:9008"
)

func TestMain(m *testing.M) {
Expand All @@ -72,15 +76,29 @@ func TestMain(m *testing.M) {
Address: slimFrugalAddr,
}, nil, server.WithPayloadCodec(thrift.NewThriftCodecWithConfig(thrift.FrugalWrite|thrift.FrugalRead)))

noDefSerdesSvrWithFrugalConfigured := thriftrpc.RunServer(&thriftrpc.ServerInitParam{
Network: "tcp",
Address: noDefSerdesFrugalAddr,
}, nil, server.WithPayloadCodec(thrift.NewThriftCodecWithConfig(thrift.FrugalWrite|thrift.FrugalRead|thrift.EnableSkipDecoder)))

noDefSerdesSvcWithFastCodecConfigured := thriftrpc.RunServer(&thriftrpc.ServerInitParam{
Network: "tcp",
Address: noDefSerdesFastCodecAddr,
}, nil, server.WithPayloadCodec(thrift.NewThriftCodecWithConfig(thrift.FastWrite|thrift.FastRead|thrift.EnableSkipDecoder)))

common.WaitServer(addr)
common.WaitServer(slimAddr)
common.WaitServer(slimFrugalAddr)
common.WaitServer(noDefSerdesFrugalAddr)
common.WaitServer(noDefSerdesFastCodecAddr)

m.Run()

svr.Stop()
slimSvr.Stop()
slimSvrWithFrugalConfigured.Stop()
noDefSerdesSvrWithFrugalConfigured.Stop()
noDefSerdesSvcWithFastCodecConfigured.Stop()
}

func getKitexClient(p transport.Protocol, opts ...client.Option) stservice.Client {
Expand All @@ -101,6 +119,15 @@ func getSlimKitexClient(p transport.Protocol, hostPorts []string, opts ...client
}, opts...)
}

func getNoDefSerdesKitexClient(p transport.Protocol, hostPorts []string, opts ...client.Option) stservice_noDefSerdes.Client {
return thriftrpc.CreateNoDefSerdesKitexClient(&thriftrpc.ClientInitParam{
TargetServiceName: "cloudwego.kitex.testa.noDefSerdes",
HostPorts: hostPorts,
Protocol: p,
ConnMode: thriftrpc.LongConnection,
}, opts...)
}

func TestStTReq(t *testing.T) {
cli = getKitexClient(transport.PurePayload)

Expand Down Expand Up @@ -387,6 +414,54 @@ func TestCircuitBreakerCustomInstanceErrorTypeFunc(t *testing.T) {
test.Assert(t, fuseCount >= 100, fuseCount)
}

func TestNoDefaultSerdes(t *testing.T) {
testCases := []struct {
desc string
hostPorts []string
opts []client.Option
}{
{
desc: "use FastCodec, connect to Frugal and SkipDecoder enabled server",
hostPorts: []string{noDefSerdesFrugalAddr},
opts: []client.Option{
client.WithPayloadCodec(thrift.NewThriftCodecWithConfig(thrift.FastWrite | thrift.FastRead)),
},
},
{
desc: "use Frugal, connect to Frugal and SkipDecoder enabled server",
hostPorts: []string{noDefSerdesFrugalAddr},
opts: []client.Option{
client.WithPayloadCodec(thrift.NewThriftCodecWithConfig(thrift.FrugalWrite | thrift.FrugalRead)),
},
},
{
desc: "use FastCodec, connect to FastCodec and SkipDecoder enabled server",
hostPorts: []string{noDefSerdesFastCodecAddr},
opts: []client.Option{
client.WithPayloadCodec(thrift.NewThriftCodecWithConfig(thrift.FastWrite | thrift.FastRead)),
},
},
{
desc: "use Frugal, connect to Frugal and SkipDecoder enabled server",
hostPorts: []string{noDefSerdesFastCodecAddr},
opts: []client.Option{
client.WithPayloadCodec(thrift.NewThriftCodecWithConfig(thrift.FrugalWrite | thrift.FrugalRead)),
},
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
cliNoDefSerdes = getNoDefSerdesKitexClient(transport.PurePayload, tc.hostPorts, tc.opts...)
ctx, stReq := thriftrpc.CreateNoDefSerdesSTRequest(context.Background())
for i := 0; i < 3; i++ {
stResp, err := cliNoDefSerdes.TestSTReq(ctx, stReq)
test.Assert(t, err == nil, err)
test.Assert(t, stReq.Str == stResp.Str)
}
})
}
}

func BenchmarkThriftCall(b *testing.B) {
cli = getKitexClient(transport.TTHeader)
ctx, stReq := thriftrpc.CreateSTRequest(context.Background())
Expand Down
77 changes: 77 additions & 0 deletions thriftrpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
"github.com/cloudwego/kitex-tests/kitex_gen/thrift/instparam"
"github.com/cloudwego/kitex-tests/kitex_gen/thrift/stability"
"github.com/cloudwego/kitex-tests/kitex_gen/thrift/stability/stservice"
instparam_noDefSerdes "github.com/cloudwego/kitex-tests/kitex_gen_noDefSerdes/thrift/instparam"
stability_noDefSerdes "github.com/cloudwego/kitex-tests/kitex_gen_noDefSerdes/thrift/stability"
stservice_noDefSerdes "github.com/cloudwego/kitex-tests/kitex_gen_noDefSerdes/thrift/stability/stservice"
instparam_slim "github.com/cloudwego/kitex-tests/kitex_gen_slim/thrift/instparam"
stability_slim "github.com/cloudwego/kitex-tests/kitex_gen_slim/thrift/stability"
stservice_slim "github.com/cloudwego/kitex-tests/kitex_gen_slim/thrift/stability/stservice"
Expand Down Expand Up @@ -76,6 +79,22 @@ func RunSlimServer(param *ServerInitParam, handler stability_slim.STService, opt
return svr
}

// RunNoDefSerdesServer .
func RunNoDefSerdesServer(param *ServerInitParam, handler stability_noDefSerdes.STService, opts ...server.Option) server.Server {
opts = generateServerOptionsFromParam(param, opts...)
if handler == nil {
handler = new(STServiceNoDefSerdesHandler)
}
svr := stservice_noDefSerdes.NewServer(handler, opts...)

go func() {
if err := svr.Run(); err != nil {
panic(err)
}
}()
return svr
}

// generateServerOptionsFromParam process ServerInitParam and add server.Option
func generateServerOptionsFromParam(param *ServerInitParam, opts ...server.Option) []server.Option {
var addr net.Addr
Expand Down Expand Up @@ -229,3 +248,61 @@ func (S *STServiceSlimHandler) CircuitBreakTest(ctx context.Context, req *stabil
}
return resp, nil
}

type STServiceNoDefSerdesHandler struct{}

func (S *STServiceNoDefSerdesHandler) VisitOneway(ctx context.Context, req *stability_noDefSerdes.STRequest) (err error) {
// TODO implement me
panic("implement me")
}

func (S *STServiceNoDefSerdesHandler) TestSTReq(ctx context.Context, req *stability_noDefSerdes.STRequest) (r *stability_noDefSerdes.STResponse, err error) {
resp := &stability_noDefSerdes.STResponse{
Str: req.Str,
Mp: req.StringMap,
FlagMsg: req.FlagMsg,
}
if req.MockCost != nil {
if mockSleep, err := time.ParseDuration(*req.MockCost); err != nil {
return nil, err
} else {
time.Sleep(mockSleep)
}
}
return resp, nil
}

func (S *STServiceNoDefSerdesHandler) TestObjReq(ctx context.Context, req *instparam_noDefSerdes.ObjReq) (r *instparam_noDefSerdes.ObjResp, err error) {
resp := &instparam_noDefSerdes.ObjResp{
Msg: req.Msg,
MsgSet: req.MsgSet,
MsgMap: req.MsgMap,
FlagMsg: req.FlagMsg,
}
if req.MockCost != nil {
if mockSleep, err := time.ParseDuration(*req.MockCost); err != nil {
return nil, err
} else {
time.Sleep(mockSleep)
}
}
return resp, nil
}

func (S *STServiceNoDefSerdesHandler) TestException(ctx context.Context, req *stability_noDefSerdes.STRequest) (r *stability_noDefSerdes.STResponse, err error) {
err = &stability_noDefSerdes.STException{Message: "mock exception"}
return nil, err
}

func (S *STServiceNoDefSerdesHandler) CircuitBreakTest(ctx context.Context, req *stability_noDefSerdes.STRequest) (r *stability_noDefSerdes.STResponse, err error) {
// force 50% of the responses to cost over 200ms
if atomic.AddInt32(&countFlag, 1)%2 == 0 {
time.Sleep(200 * time.Millisecond)
}
resp := &stability_noDefSerdes.STResponse{
Str: req.Str,
Mp: req.StringMap,
FlagMsg: req.FlagMsg,
}
return resp, nil
}

0 comments on commit 2c0d166

Please sign in to comment.