Skip to content

Commit

Permalink
Merge pull request cloudwego#1638 from cloudwego/release-v0.12.0
Browse files Browse the repository at this point in the history
chore: release v0.12.0
  • Loading branch information
Marina-Sakai authored Dec 10, 2024
2 parents 2313418 + 3f2df52 commit 0f3df4d
Show file tree
Hide file tree
Showing 241 changed files with 15,245 additions and 2,544 deletions.
31 changes: 3 additions & 28 deletions .github/workflows/pr-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ on: [ pull_request ]

jobs:
compliant:
runs-on: [ self-hosted, X64 ]
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4

- name: Check License Header
uses: apache/skywalking-eyes/[email protected]
Expand All @@ -16,32 +16,7 @@ jobs:
- name: Check Spell
uses: crate-ci/[email protected]

staticcheck:
runs-on: [ self-hosted, X64 ]
steps:
- uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: stable
# For self-hosted, the cache path is shared across projects
# and it works well without the cache of github actions
# Enable it if we're going to use Github only
cache: false

- uses: reviewdog/action-staticcheck@v1
with:
github_token: ${{ secrets.github_token }}
# Change reviewdog reporter if you need [github-pr-check,github-check,github-pr-review].
reporter: github-pr-review
# Report all results.
filter_mode: nofilter
# Exit with 1 when it finds at least one finding.
fail_on_error: true
# Set staticcheck flags
staticcheck_flags: -checks=inherit,-SA1029

lint:
golangci-lint:
runs-on: [ self-hosted, X64 ]
steps:
- uses: actions/checkout@v4
Expand Down
25 changes: 0 additions & 25 deletions .github/workflows/release-check.yml

This file was deleted.

13 changes: 9 additions & 4 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ on: [ push, pull_request ]

jobs:
unit-scenario-test:
runs-on: ubuntu-latest
runs-on: [ self-hosted, X64 ]
steps:
- uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: '1.21'
go-version: stable
cache: false
- name: Scenario Tests
run: |
cd ..
Expand Down Expand Up @@ -47,7 +48,7 @@ jobs:
go-version: ${{ matrix.go }}
cache: false # don't use cache for self-hosted runners
- name: Unit Test
run: go test -race -covermode=atomic ./...
run: go test -race ./...

codegen-test:
runs-on: ubuntu-latest
Expand All @@ -63,11 +64,12 @@ jobs:
go install ./tool/cmd/kitex
LOCAL_REPO=$(pwd)
cd ..
git clone https://github.com/cloudwego/kitex-tests.git
git clone https://github.com/cloudwego/kitex-tests.git
cd kitex-tests/codegen
go mod init codegen-test
go mod edit -replace=github.com/apache/thrift=github.com/apache/[email protected]
go mod edit -replace github.com/cloudwego/kitex=${LOCAL_REPO}
go mod edit -replace github.com/cloudwego/kitex/pkg/protocol/bthrift=${LOCAL_REPO}/pkg/protocol/bthrift
go mod tidy
bash -version
bash ./codegen_install_check.sh
Expand All @@ -79,6 +81,9 @@ jobs:
windows-test:
runs-on: windows-latest
env: # Fixes https://github.com/actions/setup-go/issues/240
GOMODCACHE: 'D:\go\pkg\mod'
GOCACHE: 'D:\go\go-build'
steps:
- uses: actions/checkout@v4
- name: Set up Go
Expand Down
34 changes: 30 additions & 4 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"strconv"
"sync/atomic"

"github.com/cloudwego/kitex/pkg/streamx"

"github.com/bytedance/gopkg/cloud/metainfo"
"github.com/cloudwego/localsession/backup"

Expand Down Expand Up @@ -70,8 +72,14 @@ type kClient struct {
mws []endpoint.Middleware
eps endpoint.Endpoint
sEps endpoint.Endpoint
opt *client.Options
lbf *lbcache.BalancerFactory

// streamx
sxStreamMW streamx.StreamMiddleware
sxStreamRecvMW streamx.StreamRecvMiddleware
sxStreamSendMW streamx.StreamSendMiddleware

opt *client.Options
lbf *lbcache.BalancerFactory

inited bool
closed bool
Expand Down Expand Up @@ -424,21 +432,30 @@ func (kc *kClient) richRemoteOption() {
// (newClientStreamer: call WriteMeta before remotecli.NewClient)
transInfoHdlr := bound.NewTransMetaHandler(kc.opt.MetaHandlers)
kc.opt.RemoteOpt.PrependBoundHandler(transInfoHdlr)

// add meta handlers into streaming meta handlers
for _, h := range kc.opt.MetaHandlers {
if shdlr, ok := h.(remote.StreamingMetaHandler); ok {
kc.opt.RemoteOpt.StreamingMetaHandlers = append(kc.opt.RemoteOpt.StreamingMetaHandlers, shdlr)
}
}
}
}

func (kc *kClient) buildInvokeChain() error {
mwchain := endpoint.Chain(kc.mws...)

innerHandlerEp, err := kc.invokeHandleEndpoint()
if err != nil {
return err
}
kc.eps = endpoint.Chain(kc.mws...)(innerHandlerEp)
kc.eps = mwchain(innerHandlerEp)

innerStreamingEp, err := kc.invokeStreamingEndpoint()
if err != nil {
return err
}
kc.sEps = endpoint.Chain(kc.mws...)(innerStreamingEp)
kc.sEps = mwchain(innerStreamingEp)
return nil
}

Expand Down Expand Up @@ -720,6 +737,9 @@ func initRPCInfo(ctx context.Context, method string, opt *client.Options, svcInf
rpcStats.ImmutableView(),
)

if mi != nil {
ri.Invocation().(rpcinfo.InvocationSetter).SetStreamingMode(mi.StreamingMode())
}
if fromMethod := ctx.Value(consts.CtxKeyMethod); fromMethod != nil {
rpcinfo.AsMutableEndpointInfo(ri.From()).SetMethod(fromMethod.(string))
}
Expand All @@ -732,6 +752,12 @@ func initRPCInfo(ctx context.Context, method string, opt *client.Options, svcInf
}
}

// streamx config
sopt := opt.StreamX
if sopt.RecvTimeout > 0 {
cfg.SetStreamRecvTimeout(sopt.RecvTimeout)
}

ctx = rpcinfo.NewCtxWithRPCInfo(ctx, ri)

if callOpts != nil && callOpts.CompressorName != "" {
Expand Down
86 changes: 86 additions & 0 deletions client/client_streamx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package client

import (
"context"

"github.com/cloudwego/kitex/client/streamxclient/streamxcallopt"
istreamxclient "github.com/cloudwego/kitex/internal/streamx/streamxclient"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/pkg/streamx"
)

// NewStream create stream for streamx mode
func (kc *kClient) NewStream(ctx context.Context, method string, streamArgs streamx.StreamArgs, callOptions ...streamxcallopt.CallOption) (context.Context, streamx.ClientStream, error) {
if !kc.inited {
panic("client not initialized")
}
if kc.closed {
panic("client is already closed")
}
if ctx == nil {
panic("ctx is nil")
}
var ri rpcinfo.RPCInfo
ctx, ri, _ = kc.initRPCInfo(ctx, method, 0, nil)

err := rpcinfo.AsMutableRPCConfig(ri.Config()).SetInteractionMode(rpcinfo.Streaming)
if err != nil {
return nil, nil, err
}
ctx = rpcinfo.NewCtxWithRPCInfo(ctx, ri)

// tracing
ctx = kc.opt.TracerCtl.DoStart(ctx, ri)
ctx, copts := istreamxclient.NewCtxWithCallOptions(ctx)
callOptions = append(callOptions, istreamxclient.WithStreamCloseCallback(func(nErr error) {
kc.opt.TracerCtl.DoFinish(ctx, ri, nErr)
}))
copts.Apply(callOptions)

if msargs := streamx.AsMutableStreamArgs(streamArgs); msargs != nil {
msargs.SetStreamMiddleware(kc.sxStreamMW)

eventHandler := kc.opt.TracerCtl.GetStreamEventHandler()
if eventHandler == nil {
msargs.SetStreamRecvMiddleware(kc.sxStreamRecvMW)
msargs.SetStreamSendMiddleware(kc.sxStreamSendMW)
} else {
traceRecvMW := streamx.NewStreamRecvStatMiddleware(ctx, eventHandler)
traceSendMW := streamx.NewStreamSendStatMiddleware(ctx, eventHandler)
if kc.sxStreamRecvMW == nil {
msargs.SetStreamRecvMiddleware(traceRecvMW)
} else {
msargs.SetStreamRecvMiddleware(streamx.StreamRecvMiddlewareChain(traceRecvMW, kc.sxStreamRecvMW))
}
if kc.sxStreamSendMW == nil {
msargs.SetStreamSendMiddleware(traceSendMW)
} else {
msargs.SetStreamSendMiddleware(streamx.StreamSendMiddlewareChain(traceSendMW, kc.sxStreamSendMW))
}
}
}
// with streamx mode, req is nil and resp is streamArgs
// it's an ugly trick but if we don't want to refactor too much,
// this is the only way to compatible with current endpoint API.
err = kc.sEps(ctx, nil, streamArgs)
if err != nil {
return nil, nil, err
}
return ctx, streamArgs.Stream().(streamx.ClientStream), nil
}
8 changes: 3 additions & 5 deletions client/genericclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ func NewClient(destService string, g generic.Generic, opts ...client.Option) (Cl

// NewClientWithServiceInfo create a generic client with serviceInfo
func NewClientWithServiceInfo(destService string, g generic.Generic, svcInfo *serviceinfo.ServiceInfo, opts ...client.Option) (Client, error) {
if isDeprecated, ok := svcInfo.Extra[generic.DeprecatedGenericServiceInfoAPIKey].(bool); ok && isDeprecated {
svcInfo.Methods, svcInfo.ServiceName = generic.GetMethodInfo(g.MessageReaderWriter(), g.IDLServiceName())
}
var options []client.Option
options = append(options, client.WithGeneric(g))
options = append(options, client.WithDestService(destService))
Expand Down Expand Up @@ -97,7 +94,8 @@ type genericServiceClient struct {

func (gc *genericServiceClient) GenericCall(ctx context.Context, method string, request interface{}, callOptions ...callopt.Option) (response interface{}, err error) {
ctx = client.NewCtxWithCallOptions(ctx, callOptions)
_args := gc.svcInfo.MethodInfo(method).NewArgs().(*generic.Args)
mtInfo := gc.svcInfo.MethodInfo(method)
_args := mtInfo.NewArgs().(*generic.Args)
_args.Method = method
_args.Request = request

Expand All @@ -109,7 +107,7 @@ func (gc *genericServiceClient) GenericCall(ctx context.Context, method string,
return nil, gc.kClient.Call(ctx, mt.Name, _args, nil)
}

_result := gc.svcInfo.MethodInfo(method).NewResult().(*generic.Result)
_result := mtInfo.NewResult().(*generic.Result)
if err = gc.kClient.Call(ctx, mt.Name, _args, _result); err != nil {
return
}
Expand Down
7 changes: 6 additions & 1 deletion client/genericclient/generic_stream_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/cloudwego/kitex/pkg/serviceinfo"
)

func streamingServiceInfo(g generic.Generic) *serviceinfo.ServiceInfo {
func StreamingServiceInfo(g generic.Generic) *serviceinfo.ServiceInfo {
return newClientStreamingServiceInfo(g)
}

Expand Down Expand Up @@ -100,5 +100,10 @@ func newClientStreamingServiceInfo(g generic.Generic) *serviceinfo.ServiceInfo {
Extra: make(map[string]interface{}),
}
svcInfo.Extra["generic"] = true
if extra, ok := g.(generic.ExtraProvider); ok {
if extra.GetExtra(generic.CombineServiceKey) == "true" {
svcInfo.Extra["combine_service"] = true
}
}
return svcInfo
}
1 change: 1 addition & 0 deletions client/genericclient/generic_stream_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func TestGenericStreamService(t *testing.T) {

svcInfo := newClientStreamingServiceInfo(g)
test.Assert(t, svcInfo.Extra["generic"] == true)
test.Assert(t, svcInfo.Extra["combine_service"] == nil)
svcInfo.GenericMethod = func(name string) serviceinfo.MethodInfo {
return svcInfo.Methods[name]
}
Expand Down
Loading

0 comments on commit 0f3df4d

Please sign in to comment.