Skip to content

Commit 3ecb2ae

Browse files
jayantxieHeyJavaBean
authored andcommitted
feat: set crrst flag on response header to ensure kitex client won't reuse bad connections (cloudwego#1653)
1 parent d3d910c commit 3ecb2ae

File tree

1 file changed

+11
-4
lines changed

1 file changed

+11
-4
lines changed

pkg/remote/trans/default_server_handler.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ func (t *svrTransHandler) OnRead(ctx context.Context, conn net.Conn) (err error)
187187
recvMsg.SetPayloadCodec(t.opt.PayloadCodec)
188188
ctx, err = t.transPipe.Read(ctx, conn, recvMsg)
189189
if err != nil {
190-
t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, true)
190+
t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, true, true)
191191
// t.OnError(ctx, err, conn) will be executed at outer function when transServer close the conn
192192
return err
193193
}
@@ -203,7 +203,7 @@ func (t *svrTransHandler) OnRead(ctx context.Context, conn net.Conn) (err error)
203203
var methodInfo serviceinfo.MethodInfo
204204
if methodInfo, err = GetMethodInfo(ri, svcInfo); err != nil {
205205
// it won't be error, because the method has been checked in decode, err check here just do defensive inspection
206-
t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, true)
206+
t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, true, true)
207207
// for proxy case, need read actual remoteAddr, error print must exec after writeErrorReplyIfNeeded,
208208
// t.OnError(ctx, err, conn) will be executed at outer function when transServer close the conn
209209
return err
@@ -219,7 +219,7 @@ func (t *svrTransHandler) OnRead(ctx context.Context, conn net.Conn) (err error)
219219
// error cannot be wrapped to print here, so it must exec before NewTransError
220220
t.OnError(ctx, err, conn)
221221
err = remote.NewTransError(remote.InternalError, err)
222-
if closeConn := t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, false); closeConn {
222+
if closeConn := t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, false, false); closeConn {
223223
return err
224224
}
225225
// connection don't need to be closed when the error is return by the server handler
@@ -288,7 +288,7 @@ func (t *svrTransHandler) SetPipeline(p *remote.TransPipeline) {
288288
}
289289

290290
func (t *svrTransHandler) writeErrorReplyIfNeeded(
291-
ctx context.Context, recvMsg remote.Message, conn net.Conn, err error, ri rpcinfo.RPCInfo, doOnMessage bool,
291+
ctx context.Context, recvMsg remote.Message, conn net.Conn, err error, ri rpcinfo.RPCInfo, doOnMessage, connReset bool,
292292
) (shouldCloseConn bool) {
293293
if cn, ok := conn.(remote.IsActive); ok && !cn.IsActive() {
294294
// conn is closed, no need reply
@@ -313,6 +313,13 @@ func (t *svrTransHandler) writeErrorReplyIfNeeded(
313313
// if error happen before normal OnMessage, exec it to transfer header trans info into rpcinfo
314314
t.transPipe.OnMessage(ctx, recvMsg, errMsg)
315315
}
316+
if connReset {
317+
// if connection needs to be closed, set ConnResetTag to response header
318+
// to ensure the client won't reuse the connection.
319+
if ei := rpcinfo.AsTaggable(ri.To()); ei != nil {
320+
ei.SetTag(rpcinfo.ConnResetTag, "1")
321+
}
322+
}
316323
ctx, err = t.transPipe.Write(ctx, conn, errMsg)
317324
if err != nil {
318325
klog.CtxErrorf(ctx, "KITEX: write error reply failed, remote=%s, error=%s", conn.RemoteAddr(), err.Error())

0 commit comments

Comments
 (0)