Skip to content

Block-wise transfers silently drops part of the payload after timeout #600

@palsivertsen

Description

@palsivertsen

I'm observing an issue with the server's handling of incoming block wise transfers when a request exceeds the block-wise timeout. Earlier blocks are silently dropped and requests are passed to handlers with an incomplete payload and no error.

Here's a test demonstrating the issue:

package coapblockwisetimeout_test

import (
	"bytes"
	"io"
	"log"
	"testing"
	"time"

	"github.com/plgd-dev/go-coap/v3/message"
	"github.com/plgd-dev/go-coap/v3/message/codes"
	"github.com/plgd-dev/go-coap/v3/mux"
	"github.com/plgd-dev/go-coap/v3/net"
	"github.com/plgd-dev/go-coap/v3/net/blockwise"
	"github.com/plgd-dev/go-coap/v3/options"
	"github.com/plgd-dev/go-coap/v3/udp"
	"github.com/plgd-dev/go-coap/v3/udp/server"
)

func TestBlockWiseTimeout(t *testing.T) {
	// parts of the payload is lost when the duration of a block transfer with
	// more than one block exceeds the blockwise timeout
	tests := []struct {
		lag              time.Duration
		blockwiseTimeout time.Duration
	}{
		// lag >= timeout fails
		{
			lag:              0,
			blockwiseTimeout: time.Second,
		},
		{
			lag:              900 * time.Millisecond,
			blockwiseTimeout: time.Second,
		},
		{
			lag:              time.Second,
			blockwiseTimeout: time.Second,
		},
		{
			lag:              100 * time.Millisecond,
			blockwiseTimeout: 100 * time.Millisecond,
		},
	}
	for _, tt := range tests {
		t.Run(tt.lag.String()+"_"+tt.blockwiseTimeout.String(), func(t *testing.T) {
			t.Parallel()

			var actualBody bytes.Buffer // handler writes payload here

			url := setupServer(t,
				options.WithMux(recBodyHandler(&actualBody)),
				options.WithBlockwise(true, blockwise.SZX16, tt.blockwiseTimeout),
			)

			conn, err := udp.Dial(url)
			if err != nil {
				t.Fatal("Error dialing: ", err)
			}
			t.Log("client:", conn.LocalAddr().String())

			// Manage messages manually to avoid running into:
			// https://github.com/plgd-dev/go-coap/issues/572
			// Send a put request with payload split into two requests.
			firstBlock := conn.AcquireMessage(t.Context())
			firstBlock.SetMessage(message.Message{
				Token: message.Token{123},
				Options: message.Options{{
					ID:    message.Block1,
					Value: []byte{0b0000_1_000}, // num 0, more true, szx 16
				}},
				Code:      codes.PUT,
				Payload:   []byte("first part      "), // 16 bytes
				MessageID: 0,
				Type:      message.Confirmable,
			})

			if err := conn.WriteMessage(firstBlock); err != nil {
				t.Fatal("put: ", err)
			}
			conn.ReleaseMessage(firstBlock)

			// add a delay between blocks
			time.Sleep(tt.lag)

			secondBlock := conn.AcquireMessage(t.Context())
			secondBlock.SetMessage(message.Message{
				Token: message.Token{123},
				Options: message.Options{{
					ID:    message.Block1,
					Value: []byte{0b0001_0_000}, // num 1, more false, szx 16
				}},
				Code:      codes.PUT,
				Payload:   []byte("second part"),
				MessageID: 1,
				Type:      message.Confirmable,
			})

			if err := conn.WriteMessage(secondBlock); err != nil {
				t.Fatal("put: ", err)
			}
			conn.ReleaseMessage(secondBlock)

			actualBodyStr := actualBody.String()
			expectedBody := "first part      second part"
			if actualBodyStr != expectedBody {
				t.Fatalf("unexpected body: %q, got: %q", actualBodyStr, expectedBody)
			}
		})
	}
}

// helpers

func setupServer(tb testing.TB, opts ...server.Option) string {
	tb.Helper()

	l, err := net.NewListenUDP("udp4", "localhost:0")
	if err != nil {
		tb.Fatalf("create listener: %s", err)
	}
	tb.Cleanup(func() {
		if errC := l.Close(); errC != nil && err == nil {
			err = errC
		}
	})

	s := udp.NewServer(opts...)
	tb.Cleanup(s.Stop)

	tb.Log("starting server:", l.LocalAddr().String())
	go func() {
		if err := s.Serve(l); err != nil {
			log.Println("serve:", err)
		}
	}()

	return l.LocalAddr().String()
}

func recBodyHandler(rec io.Writer) mux.HandlerFunc {
	return func(w mux.ResponseWriter, r *mux.Message) {
		// append the payload to rec
		if _, err := io.Copy(rec, r.Body()); err != nil {
			panic(err)
		}

		if err := w.SetResponse(codes.Valid, message.AppOctets, nil); err != nil {
			panic(err)
		}
	}
}

Test output:

--- FAIL: TestBlockWiseTimeout (0.00s)
    --- FAIL: TestBlockWiseTimeout/100ms_100ms (0.10s)
        bug_test.go:51: starting server: 127.0.0.1:36497
        bug_test.go:60: client: 127.0.0.1:54059
        bug_test.go:107: unexpected body: "second part", got: "first part      second part"
    --- FAIL: TestBlockWiseTimeout/1s_1s (1.00s)
        bug_test.go:51: starting server: 127.0.0.1:56725
        bug_test.go:60: client: 127.0.0.1:36002
        bug_test.go:107: unexpected body: "second part", got: "first part      second part"
FAIL
FAIL	tmp	1.007s
?   	tmp/client	[no test files]
?   	tmp/server	[no test files]
FAIL

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions