Skip to content

Commit 4702afe

Browse files
authored
Fix http transport client blocking recv (#2744)
* reproduce blocking recv * fix blocking recv call on httpTransportClient * prevent race condition * refactoring
1 parent e032a6a commit 4702afe

File tree

2 files changed

+88
-3
lines changed

2 files changed

+88
-3
lines changed

transport/http_client.go

+11-3
Original file line numberDiff line numberDiff line change
@@ -111,18 +111,26 @@ func (h *httpTransportClient) Recv(msg *Message) (err error) {
111111
var req *http.Request
112112

113113
if !h.dialOpts.Stream {
114-
rc, ok := <-h.req
114+
115+
var rc *http.Request
116+
var ok bool
117+
118+
h.Lock()
119+
select {
120+
case rc, ok = <-h.req:
121+
default:
122+
}
123+
115124
if !ok {
116-
h.Lock()
117125
if len(h.reqList) == 0 {
118126
h.Unlock()
119127
return io.EOF
120128
}
121129

122130
rc = h.reqList[0]
123131
h.reqList = h.reqList[1:]
124-
h.Unlock()
125132
}
133+
h.Unlock()
126134

127135
req = rc
128136
}

transport/http_client_test.go

+77
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package transport
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
7+
"github.com/pkg/errors"
8+
)
9+
10+
func TestHttpTransportClient(t *testing.T) {
11+
// arrange
12+
l, c, err := echoHttpTransportClient("127.0.0.1:")
13+
if err != nil {
14+
t.Error(err)
15+
}
16+
defer l.Close()
17+
defer c.Close()
18+
19+
// act + assert
20+
N := cap(c.req)
21+
// Send N+1 messages to overflow the buffered channel and place the extra message in the internal buffer
22+
for i := 0; i < N+1; i++ {
23+
body := fmt.Sprintf("msg-%d", i)
24+
if err := c.Send(&Message{Body: []byte(body)}); err != nil {
25+
t.Errorf("Unexpected send err: %v", err)
26+
}
27+
}
28+
29+
// consume all requests from the buffered channel
30+
for i := 0; i < N; i++ {
31+
msg := Message{}
32+
if err := c.Recv(&msg); err != nil {
33+
t.Errorf("Unexpected recv err: %v", err)
34+
}
35+
}
36+
37+
if len(c.reqList) != 1 {
38+
t.Error("Unexpected reqList")
39+
}
40+
41+
msg := Message{}
42+
if err := c.Recv(&msg); err != nil {
43+
t.Errorf("Unexpected recv err: %v", err)
44+
}
45+
want := fmt.Sprintf("msg-%d", N)
46+
got := string(msg.Body)
47+
if want != got {
48+
t.Errorf("Unexpected message: got %q, want %q", got, want)
49+
}
50+
}
51+
52+
func echoHttpTransportClient(addr string) (*httpTransportListener, *httpTransportClient, error) {
53+
tr := NewHTTPTransport()
54+
l, err := tr.Listen(addr)
55+
if err != nil {
56+
return nil, nil, errors.Errorf("Unexpected listen err: %v", err)
57+
}
58+
c, err := tr.Dial(l.Addr())
59+
if err != nil {
60+
return nil, nil, errors.Errorf("Unexpected dial err: %v", err)
61+
}
62+
go l.Accept(echoHandler)
63+
return l.(*httpTransportListener), c.(*httpTransportClient), nil
64+
}
65+
66+
func echoHandler(sock Socket) {
67+
defer sock.Close()
68+
for {
69+
var msg Message
70+
if err := sock.Recv(&msg); err != nil {
71+
return
72+
}
73+
if err := sock.Send(&msg); err != nil {
74+
return
75+
}
76+
}
77+
}

0 commit comments

Comments
 (0)