diff --git a/changelog/fragments/1764710604-memcache-udp-fragment-check.yaml b/changelog/fragments/1764710604-memcache-udp-fragment-check.yaml new file mode 100644 index 000000000000..dbbc6526ac6e --- /dev/null +++ b/changelog/fragments/1764710604-memcache-udp-fragment-check.yaml @@ -0,0 +1,4 @@ +kind: bug-fix +summary: verify and cap memcache udp fragment counts +component: packetbeat +PR URL: https://github.com/elastic/beats/pull/47874 diff --git a/packetbeat/packetbeat_memcache_udp.yml b/packetbeat/packetbeat_memcache_udp.yml new file mode 100644 index 000000000000..5f87ed37cb62 --- /dev/null +++ b/packetbeat/packetbeat_memcache_udp.yml @@ -0,0 +1,24 @@ +packetbeat.interfaces: + device: lo + +packetbeat.protocols: + - type: memcache + ports: [11211] + udp_only: true + +queue.mem: + events: 64 + flush.min_events: 1 + +output.console: + enabled: true + pretty: true + +logging: + level: debug + selectors: [memcache] + to_files: false + +setup.template.enabled: false +setup.kibana: + host: 0.0.0.0:0 diff --git a/packetbeat/protos/memcache/plugin_udp.go b/packetbeat/protos/memcache/plugin_udp.go index 0f2933700369..e5d696bfc94e 100644 --- a/packetbeat/protos/memcache/plugin_udp.go +++ b/packetbeat/protos/memcache/plugin_udp.go @@ -82,6 +82,8 @@ type udpMessage struct { datagrams [][]byte } +const maxUDPMemcacheFragments = 64 + func (mc *memcache) ParseUDP(pkt *protos.Packet) { buffer := streambuf.NewFixed(pkt.Payload) header, err := parseUDPHeader(buffer) @@ -114,6 +116,11 @@ func (mc *memcache) ParseUDP(pkt *protos.Packet) { // get UDP transaction stream combining datagram packets in transaction udpMsg := trans.udpMessageForDir(&header, dir) + if udpMsg == nil { + logp.Warn("dropping memcache(UDP) transaction with invalid fragment metadata") + connection.killTransaction(trans) + return + } if udpMsg.numDatagrams != header.numDatagrams { logp.Warn("number of datagram mismatches in stream") connection.killTransaction(trans) @@ -143,7 +150,9 @@ func (mc *memcache) ParseUDP(pkt *protos.Packet) { if !done { trans.timer = time.AfterFunc(mc.udpConfig.transTimeout, func() { debug("transaction timeout -> forward") - mc.onUDPTrans(trans) + if err := mc.onUDPTrans(trans); err != nil { + logp.Warn("error processing timeout memcache transaction: %s", err) + } mc.udpExpTrans.push(trans) }) } @@ -283,18 +292,22 @@ func (t *udpTransaction) udpMessageForDir( udpMsg := t.messages[dir] if udpMsg == nil { udpMsg = newUDPMessage(header) + if udpMsg == nil { + return nil + } t.messages[dir] = udpMsg } return udpMsg } func newUDPMessage(header *mcUDPHeader) *udpMessage { - udpMsg := &udpMessage{ - numDatagrams: header.numDatagrams, - count: 0, + count := header.numDatagrams + if count == 0 || count > maxUDPMemcacheFragments { + return nil } - if header.numDatagrams > 1 { - udpMsg.datagrams = make([][]byte, header.numDatagrams) + udpMsg := &udpMessage{numDatagrams: count} + if count > 1 { + udpMsg.datagrams = make([][]byte, count) } return udpMsg } @@ -313,10 +326,14 @@ func (msg *udpMessage) addDatagram( } if msg.count < msg.numDatagrams { - if msg.datagrams[header.seqNumber] != nil { + idx := int(header.seqNumber) + if idx >= len(msg.datagrams) { + return nil + } + if msg.datagrams[idx] != nil { return nil } - msg.datagrams[header.seqNumber] = data + msg.datagrams[idx] = data msg.count++ } @@ -326,7 +343,9 @@ func (msg *udpMessage) addDatagram( buffer := streambuf.New(nil) for _, payload := range msg.datagrams { - buffer.Append(payload) + if err := buffer.Append(payload); err != nil { + return nil + } } msg.isComplete = true msg.datagrams = nil @@ -339,7 +358,9 @@ func parseUDPHeader(buf *streambuf.Buffer) (mcUDPHeader, error) { h.requestID, _ = buf.ReadNetUint16() h.seqNumber, _ = buf.ReadNetUint16() h.numDatagrams, _ = buf.ReadNetUint16() - buf.Advance(2) // ignore reserved + if err := buf.Advance(2); err != nil { // ignore reserved + return h, err + } return h, buf.Err() } diff --git a/packetbeat/protos/memcache/plugin_udp_test.go b/packetbeat/protos/memcache/plugin_udp_test.go index b80ef1e59cec..1c1e96ee6058 100644 --- a/packetbeat/protos/memcache/plugin_udp_test.go +++ b/packetbeat/protos/memcache/plugin_udp_test.go @@ -23,6 +23,9 @@ import ( "testing" "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/common/streambuf" + "github.com/elastic/beats/v7/packetbeat/protos/applayer" ) func Test_UdpDatagramAddOnCompleteMessage(t *testing.T) { @@ -96,3 +99,115 @@ func Test_UdpDatagramMultipleDups(t *testing.T) { assert.Equal(t, 8, buf.Len()) assert.Equal(t, []byte{1, 2, 3, 4, 5, 6, 7, 8}, buf.Bytes()) } + +func Test_NewUDPMessageZeroDatagrams(t *testing.T) { + hdr := &mcUDPHeader{requestID: 10, seqNumber: 0, numDatagrams: 0} + msg := newUDPMessage(hdr) + assert.Nil(t, msg) +} + +func Test_NewUDPMessageExceedsMaxFragments(t *testing.T) { + hdr := &mcUDPHeader{requestID: 10, seqNumber: 0, numDatagrams: maxUDPMemcacheFragments + 1} + msg := newUDPMessage(hdr) + assert.Nil(t, msg) +} + +func Test_NewUDPMessageAtMaxFragments(t *testing.T) { + hdr := &mcUDPHeader{requestID: 10, seqNumber: 0, numDatagrams: maxUDPMemcacheFragments} + msg := newUDPMessage(hdr) + assert.NotNil(t, msg) + assert.Equal(t, uint16(maxUDPMemcacheFragments), msg.numDatagrams) +} + +func Test_AddDatagramOutOfBounds(t *testing.T) { + hdr := &mcUDPHeader{requestID: 10, seqNumber: 0, numDatagrams: 2} + msg := newUDPMessage(hdr) + assert.NotNil(t, msg) + + // Add first datagram + buf := msg.addDatagram(hdr, []byte{1, 2}) + assert.Nil(t, buf) + + // Try to add datagram with seqNumber out of bounds + hdr.seqNumber = 2 // Only 0 and 1 are valid for numDatagrams=2 + buf = msg.addDatagram(hdr, []byte{3, 4}) + assert.Nil(t, buf) +} + +func Test_UdpMessageForDirReturnsNilWhenNewUDPMessageFails(t *testing.T) { + trans := &udpTransaction{ + messages: [2]*udpMessage{}, + } + + // Test with zero datagrams (should cause newUDPMessage to return nil) + hdr := &mcUDPHeader{requestID: 10, seqNumber: 0, numDatagrams: 0} + udpMsg := trans.udpMessageForDir(hdr, applayer.NetOriginalDirection) + assert.Nil(t, udpMsg) + + // Test with too many datagrams (should cause newUDPMessage to return nil) + hdr.numDatagrams = maxUDPMemcacheFragments + 1 + udpMsg = trans.udpMessageForDir(hdr, applayer.NetOriginalDirection) + assert.Nil(t, udpMsg) +} + +func Test_AddDatagramAppendErrorHandling(t *testing.T) { + // Test that addDatagram correctly handles buffer.Append errors + // This test verifies the error handling path in addDatagram where + // buffer.Append is called. In normal operation, Append should succeed, + // but we verify the code path exists and handles errors correctly. + hdr := &mcUDPHeader{requestID: 10, seqNumber: 0, numDatagrams: 3} + msg := newUDPMessage(hdr) + assert.NotNil(t, msg) + + // Add all datagrams in order - this exercises the buffer.Append path + buf := msg.addDatagram(hdr, []byte{1, 2}) + assert.Nil(t, buf) // Not complete yet + + hdr.seqNumber = 1 + buf = msg.addDatagram(hdr, []byte{3, 4}) + assert.Nil(t, buf) // Not complete yet + + hdr.seqNumber = 2 + buf = msg.addDatagram(hdr, []byte{5, 6}) + assert.NotNil(t, buf) // Should be complete now + + // Verify the buffer was correctly assembled + assert.Equal(t, 6, buf.Len()) + assert.Equal(t, []byte{1, 2, 3, 4, 5, 6}, buf.Bytes()) +} + +func Test_ParseUDPHeader(t *testing.T) { + // Test successful parsing of UDP header + // UDP header format: requestID (2 bytes) + seqNumber (2 bytes) + numDatagrams (2 bytes) + reserved (2 bytes) + headerData := []byte{ + 0x12, 0x34, // requestID = 0x1234 + 0x56, 0x78, // seqNumber = 0x5678 + 0x9A, 0xBC, // numDatagrams = 0x9ABC + 0x00, 0x00, // reserved + } + buf := streambuf.NewFixed(headerData) + hdr, err := parseUDPHeader(buf) + assert.NoError(t, err) + assert.Equal(t, uint16(0x1234), hdr.requestID) + assert.Equal(t, uint16(0x5678), hdr.seqNumber) + assert.Equal(t, uint16(0x9ABC), hdr.numDatagrams) +} + +func Test_ParseUDPHeaderInsufficientData(t *testing.T) { + // Test error handling when buffer is too short for Advance(2) + // Header needs 8 bytes total: 6 bytes for the three uint16s + 2 bytes for reserved + // This test uses only 6 bytes, so Advance(2) will fail + headerData := []byte{ + 0x12, 0x34, // requestID = 0x1234 + 0x56, 0x78, // seqNumber = 0x5678 + 0x9A, 0xBC, // numDatagrams = 0x9ABC + // Missing reserved 2 bytes - this will cause Advance(2) to fail + } + buf := streambuf.NewFixed(headerData) + hdr, err := parseUDPHeader(buf) + assert.Error(t, err) + // Header values should still be set from the reads before Advance failed + assert.Equal(t, uint16(0x1234), hdr.requestID) + assert.Equal(t, uint16(0x5678), hdr.seqNumber) + assert.Equal(t, uint16(0x9ABC), hdr.numDatagrams) +}