Skip to content

Commit

Permalink
feat: add event handler tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jayantxie committed Feb 13, 2025
1 parent eb34aa9 commit f8c18e4
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 3 deletions.
25 changes: 22 additions & 3 deletions streamx/thrift/thrift_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"testing"
"time"

"github.com/cloudwego/kitex-tests/streamx"

"github.com/bytedance/gopkg/cloud/metainfo"
"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/pkg/endpoint"
Expand Down Expand Up @@ -144,6 +146,12 @@ func (s *serviceImpl) EchoBidi(ctx context.Context, stream echo.TestService_Echo
if d != maxReceiveTimes {
return errors.New("send middleware builder call times is not maxReceiveTimes")
}
if *ri.Invocation().Extra("test_stream_send_event").(*int) != maxReceiveTimes {
return errors.New("send event call times is not maxReceiveTimes")
}
if *ri.Invocation().Extra("test_stream_recv_event").(*int) != maxReceiveTimes {
return errors.New("recv event call times is not maxReceiveTimes")
}
return nil
}
return err
Expand Down Expand Up @@ -200,10 +208,12 @@ func (s *serviceImpl) EchoClient(ctx context.Context, stream echo.TestService_Ec
if a != maxReceiveTimes+1 {
return errors.New("recv middleware call times is not maxReceiveTimes")
}

if c != maxReceiveTimes+1 {
return errors.New("recv middleware builder call times is not maxReceiveTimes")
}
if *ri.Invocation().Extra("test_stream_recv_event").(*int) != maxReceiveTimes {
return errors.New("recv event call times is not maxReceiveTimes")
}
err = stream.SendAndClose(ctx, &echo.EchoClientResponse{
Message: "pong",
})
Expand All @@ -216,6 +226,9 @@ func (s *serviceImpl) EchoClient(ctx context.Context, stream echo.TestService_Ec
if d != 1 {
return errors.New("send middleware builder call times is not 1")
}
if *ri.Invocation().Extra("test_stream_send_event").(*int) != 1 {
return errors.New("send event call times is not 1")
}
}
return err
}
Expand Down Expand Up @@ -275,12 +288,18 @@ func (s *serviceImpl) EchoServer(ctx context.Context, req *echo.EchoClientReques
if d != maxReceiveTimes {
return errors.New("send middleware builder call times is not maxReceiveTimes")
}
if *ri.Invocation().Extra("test_stream_recv_event").(*int) != 1 {
return errors.New("recv event call times is not 1")
}
if *ri.Invocation().Extra("test_stream_send_event").(*int) != maxReceiveTimes {
return errors.New("send event call times is not maxReceiveTimes")
}
return nil
}

func runServer(listenaddr string) server.Server {
addr, _ := net.ResolveTCPAddr("tcp", listenaddr)
svr := testservice.NewServer(&serviceImpl{}, server.WithServiceAddr(addr), server.WithExitWaitTime(1*time.Second),
svr := testservice.NewServer(&serviceImpl{}, server.WithServiceAddr(addr), server.WithExitWaitTime(1*time.Second), server.WithTracer(streamx.NewTracer()),
server.WithMetaHandler(transmeta.ServerTTHeaderHandler), server.WithMetaHandler(transmeta.ServerHTTP2Handler),
server.WithUnaryOptions(server.WithUnaryMiddleware(func(next endpoint.UnaryEndpoint) endpoint.UnaryEndpoint {
return func(ctx context.Context, req, resp interface{}) (err error) {
Expand Down Expand Up @@ -376,7 +395,7 @@ func runClient(t *testing.T, isTTHeaderStreaming bool) {
// ttheader streaming has higher priority than grpc streaming
prot |= transport.TTHeaderStreaming
}
cli := testservice.MustNewClient("service", client.WithHostPorts(thriftTestAddr),
cli := testservice.MustNewClient("service", client.WithHostPorts(thriftTestAddr), client.WithTracer(streamx.NewTracer()),
client.WithTransportProtocol(prot),
client.WithMetaHandler(transmeta.ClientHTTP2Handler), client.WithMetaHandler(transmeta.ClientTTHeaderHandler),
client.WithUnaryOptions(client.WithUnaryRPCTimeout(200*time.Millisecond),
Expand Down
42 changes: 42 additions & 0 deletions streamx/tracer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package streamx

import (
"context"

"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/pkg/stats"
)

func NewTracer() stats.Tracer {
return &tracer{}
}

type tracer struct {
}

func (t *tracer) Start(ctx context.Context) context.Context {
return ctx
}

func (t *tracer) Finish(ctx context.Context) {
return
}

func (t *tracer) ReportStreamEvent(ctx context.Context, ri rpcinfo.RPCInfo, event rpcinfo.Event) {
if event.Event() == stats.StreamSend {
if ri.Invocation().Extra("test_stream_send_event") == nil {
var count int
ri.Invocation().(rpcinfo.InvocationSetter).SetExtra("test_stream_send_event", &count)
}
count := ri.Invocation().Extra("test_stream_send_event").(*int)
*count++
} else {
if ri.Invocation().Extra("test_stream_recv_event") == nil {
var count int
ri.Invocation().(rpcinfo.InvocationSetter).SetExtra("test_stream_recv_event", &count)
}
count := ri.Invocation().Extra("test_stream_recv_event").(*int)
*count++
}
return
}

0 comments on commit f8c18e4

Please sign in to comment.