Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Commit

Permalink
feat(broker): treat non-CloudEvent reply as successful delivery (#2073)
Browse files Browse the repository at this point in the history
  • Loading branch information
eclipselu authored Jan 18, 2021
1 parent a1053ae commit 1978539
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 16 deletions.
8 changes: 8 additions & 0 deletions pkg/broker/handler/processors/deliver/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"net/http"
"net/url"
"strings"
"time"

"github.com/cloudevents/sdk-go/v2/binding"
Expand Down Expand Up @@ -176,6 +177,13 @@ func (p *Processor) deliver(ctx context.Context, target *config.Target, broker *
return fmt.Errorf("event delivery failed: HTTP status code %d", resp.StatusCode)
}

// Pre-check the reply response header, if it's not in structured mode/batched mode or binary mode,
// then it's not a CloudEvent, we treat the delivery as successful and ignore the response.
// Otherwise, we proceed with the malformed event check.
if !strings.HasPrefix(resp.Header.Get("Content-Type"), "application/cloudevents") && (resp.Header.Get("ce-specversion") == "") {
return nil
}

respMsg := cehttp.NewMessageFromHttpResponse(resp)
if respMsg.ReadEncoding() == binding.EncodingUnknown {
// If the response code is 2xx and has a body but the encoding is unknown,
Expand Down
43 changes: 27 additions & 16 deletions pkg/broker/handler/processors/deliver/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,11 @@ func TestDeliverSuccess(t *testing.T) {
}

type targetWithFailureHandler struct {
t *testing.T
delay time.Duration
respCode int
malFormedEvent bool
t *testing.T
delay time.Duration
nonCloudEventReply bool
respCode int
respBody string
}

func (h *targetWithFailureHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
Expand All @@ -218,10 +219,15 @@ func (h *targetWithFailureHandler) ServeHTTP(w http.ResponseWriter, req *http.Re

time.Sleep(h.delay)

if h.malFormedEvent {
w.Write([]byte("not an valid event body"))
w.Header().Set("ce-specversion", "1.0")
if h.nonCloudEventReply {
// missing ce-specversion header
w.Header().Del("ce-specversion")
// Content-Type does not start with `application/cloudevents`
w.Header().Set("Content-Type", "text/html")
}
w.WriteHeader(h.respCode)
w.Write([]byte(h.respBody))
}

func TestDeliverFailure(t *testing.T) {
Expand All @@ -239,6 +245,7 @@ func TestDeliverFailure(t *testing.T) {
name: "delivery error retry success",
targetHandler: &targetWithFailureHandler{respCode: http.StatusInternalServerError},
withRetry: true,
wantErr: false,
}, {
name: "delivery error retry failure",
targetHandler: &targetWithFailureHandler{respCode: http.StatusInternalServerError},
Expand All @@ -253,16 +260,27 @@ func TestDeliverFailure(t *testing.T) {
name: "delivery timeout retry success",
targetHandler: &targetWithFailureHandler{delay: time.Second, respCode: http.StatusOK},
withRetry: true,
wantErr: false,
}, {
name: "delivery timeout retry failure",
withRetry: true,
targetHandler: &targetWithFailureHandler{delay: time.Second, respCode: http.StatusOK},
failRetry: true,
wantErr: true,
}, {
name: "malformed reply failure",
name: "malformed CloudEvent reply failure",
// Return 2xx but with a malformed event should be considered error.
targetHandler: &targetWithFailureHandler{respCode: http.StatusOK, malFormedEvent: true},
targetHandler: &targetWithFailureHandler{respCode: http.StatusOK, respBody: "not a valid reply body"},
wantErr: true,
}, {
name: "non-CloudEvent reply success",
// a non-CloudEvent reply with 2xx status code should be considered delivery success.
targetHandler: &targetWithFailureHandler{respCode: http.StatusOK, respBody: "reply body", nonCloudEventReply: true},
wantErr: false,
}, {
name: "non-CloudEvent reply failure",
// a non-CloudEvent reply with non-2xx status code should be considered delivery failure.
targetHandler: &targetWithFailureHandler{respCode: http.StatusBadRequest, respBody: "reply body", nonCloudEventReply: true},
wantErr: true,
}}

Expand Down Expand Up @@ -626,14 +644,6 @@ func benchmarkRetry(b *testing.B, httpClient *http.Client, targetAddress string,
})
}

func toFakePubsubMessage(m *pstest.Message) *pubsub.Message {
return &pubsub.Message{
ID: m.ID,
Attributes: m.Attributes,
Data: m.Data,
}
}

func testPubsubClient(ctx context.Context, t testing.TB, projectID string) (*pstest.Server, *pubsub.Client, func()) {
t.Helper()
srv := pstest.NewServer()
Expand All @@ -657,6 +667,7 @@ func newSampleEvent() *event.Event {
sampleEvent.SetID("id")
sampleEvent.SetSource("source")
sampleEvent.SetSubject("subject")
sampleEvent.SetSpecVersion("1.0")
sampleEvent.SetType("type")
sampleEvent.SetTime(time.Now())
return &sampleEvent
Expand Down

0 comments on commit 1978539

Please sign in to comment.