Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
kind: bug-fix
summary: verify and cap memcache udp fragment counts
component: packetbeat
PR URL: https://github.com/elastic/beats/pull/47874
24 changes: 24 additions & 0 deletions packetbeat/packetbeat_memcache_udp.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
packetbeat.interfaces:
device: lo

packetbeat.protocols:
- type: memcache
ports: [11211]
udp_only: true

queue.mem:
events: 64
flush.min_events: 1

output.console:
enabled: true
pretty: true

logging:
level: debug
selectors: [memcache]
to_files: false

setup.template.enabled: false
setup.kibana:
host: 0.0.0.0:0
41 changes: 31 additions & 10 deletions packetbeat/protos/memcache/plugin_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ type udpMessage struct {
datagrams [][]byte
}

const maxUDPMemcacheFragments = 64

func (mc *memcache) ParseUDP(pkt *protos.Packet) {
buffer := streambuf.NewFixed(pkt.Payload)
header, err := parseUDPHeader(buffer)
Expand Down Expand Up @@ -114,6 +116,11 @@ func (mc *memcache) ParseUDP(pkt *protos.Packet) {

// get UDP transaction stream combining datagram packets in transaction
udpMsg := trans.udpMessageForDir(&header, dir)
if udpMsg == nil {
logp.Warn("dropping memcache(UDP) transaction with invalid fragment metadata")
connection.killTransaction(trans)
return
}
if udpMsg.numDatagrams != header.numDatagrams {
logp.Warn("number of datagram mismatches in stream")
connection.killTransaction(trans)
Expand Down Expand Up @@ -143,7 +150,9 @@ func (mc *memcache) ParseUDP(pkt *protos.Packet) {
if !done {
trans.timer = time.AfterFunc(mc.udpConfig.transTimeout, func() {
debug("transaction timeout -> forward")
mc.onUDPTrans(trans)
if err := mc.onUDPTrans(trans); err != nil {
logp.Warn("error processing timeout memcache transaction: %s", err)
}
mc.udpExpTrans.push(trans)
})
}
Expand Down Expand Up @@ -283,18 +292,22 @@ func (t *udpTransaction) udpMessageForDir(
udpMsg := t.messages[dir]
if udpMsg == nil {
udpMsg = newUDPMessage(header)
if udpMsg == nil {
return nil
}
t.messages[dir] = udpMsg
}
return udpMsg
}

func newUDPMessage(header *mcUDPHeader) *udpMessage {
udpMsg := &udpMessage{
numDatagrams: header.numDatagrams,
count: 0,
count := header.numDatagrams
if count == 0 || count > maxUDPMemcacheFragments {
return nil
}
if header.numDatagrams > 1 {
udpMsg.datagrams = make([][]byte, header.numDatagrams)
udpMsg := &udpMessage{numDatagrams: count}
if count > 1 {
udpMsg.datagrams = make([][]byte, count)
}
return udpMsg
}
Expand All @@ -313,10 +326,14 @@ func (msg *udpMessage) addDatagram(
}

if msg.count < msg.numDatagrams {
if msg.datagrams[header.seqNumber] != nil {
idx := int(header.seqNumber)
if idx >= len(msg.datagrams) {
return nil
}
if msg.datagrams[idx] != nil {
return nil
}
msg.datagrams[header.seqNumber] = data
msg.datagrams[idx] = data
msg.count++
}

Expand All @@ -326,7 +343,9 @@ func (msg *udpMessage) addDatagram(

buffer := streambuf.New(nil)
for _, payload := range msg.datagrams {
buffer.Append(payload)
if err := buffer.Append(payload); err != nil {
return nil
}
}
msg.isComplete = true
msg.datagrams = nil
Expand All @@ -339,7 +358,9 @@ func parseUDPHeader(buf *streambuf.Buffer) (mcUDPHeader, error) {
h.requestID, _ = buf.ReadNetUint16()
h.seqNumber, _ = buf.ReadNetUint16()
h.numDatagrams, _ = buf.ReadNetUint16()
buf.Advance(2) // ignore reserved
if err := buf.Advance(2); err != nil { // ignore reserved
return h, err
}
return h, buf.Err()
}

Expand Down
115 changes: 115 additions & 0 deletions packetbeat/protos/memcache/plugin_udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/common/streambuf"
"github.com/elastic/beats/v7/packetbeat/protos/applayer"
)

func Test_UdpDatagramAddOnCompleteMessage(t *testing.T) {
Expand Down Expand Up @@ -96,3 +99,115 @@ func Test_UdpDatagramMultipleDups(t *testing.T) {
assert.Equal(t, 8, buf.Len())
assert.Equal(t, []byte{1, 2, 3, 4, 5, 6, 7, 8}, buf.Bytes())
}

func Test_NewUDPMessageZeroDatagrams(t *testing.T) {
hdr := &mcUDPHeader{requestID: 10, seqNumber: 0, numDatagrams: 0}
msg := newUDPMessage(hdr)
assert.Nil(t, msg)
}

func Test_NewUDPMessageExceedsMaxFragments(t *testing.T) {
hdr := &mcUDPHeader{requestID: 10, seqNumber: 0, numDatagrams: maxUDPMemcacheFragments + 1}
msg := newUDPMessage(hdr)
assert.Nil(t, msg)
}

func Test_NewUDPMessageAtMaxFragments(t *testing.T) {
hdr := &mcUDPHeader{requestID: 10, seqNumber: 0, numDatagrams: maxUDPMemcacheFragments}
msg := newUDPMessage(hdr)
assert.NotNil(t, msg)
assert.Equal(t, uint16(maxUDPMemcacheFragments), msg.numDatagrams)
}

func Test_AddDatagramOutOfBounds(t *testing.T) {
hdr := &mcUDPHeader{requestID: 10, seqNumber: 0, numDatagrams: 2}
msg := newUDPMessage(hdr)
assert.NotNil(t, msg)

// Add first datagram
buf := msg.addDatagram(hdr, []byte{1, 2})
assert.Nil(t, buf)

// Try to add datagram with seqNumber out of bounds
hdr.seqNumber = 2 // Only 0 and 1 are valid for numDatagrams=2
buf = msg.addDatagram(hdr, []byte{3, 4})
assert.Nil(t, buf)
}

func Test_UdpMessageForDirReturnsNilWhenNewUDPMessageFails(t *testing.T) {
trans := &udpTransaction{
messages: [2]*udpMessage{},
}

// Test with zero datagrams (should cause newUDPMessage to return nil)
hdr := &mcUDPHeader{requestID: 10, seqNumber: 0, numDatagrams: 0}
udpMsg := trans.udpMessageForDir(hdr, applayer.NetOriginalDirection)
assert.Nil(t, udpMsg)

// Test with too many datagrams (should cause newUDPMessage to return nil)
hdr.numDatagrams = maxUDPMemcacheFragments + 1
udpMsg = trans.udpMessageForDir(hdr, applayer.NetOriginalDirection)
assert.Nil(t, udpMsg)
}

func Test_AddDatagramAppendErrorHandling(t *testing.T) {
// Test that addDatagram correctly handles buffer.Append errors
// This test verifies the error handling path in addDatagram where
// buffer.Append is called. In normal operation, Append should succeed,
// but we verify the code path exists and handles errors correctly.
hdr := &mcUDPHeader{requestID: 10, seqNumber: 0, numDatagrams: 3}
msg := newUDPMessage(hdr)
assert.NotNil(t, msg)

// Add all datagrams in order - this exercises the buffer.Append path
buf := msg.addDatagram(hdr, []byte{1, 2})
assert.Nil(t, buf) // Not complete yet

hdr.seqNumber = 1
buf = msg.addDatagram(hdr, []byte{3, 4})
assert.Nil(t, buf) // Not complete yet

hdr.seqNumber = 2
buf = msg.addDatagram(hdr, []byte{5, 6})
assert.NotNil(t, buf) // Should be complete now

// Verify the buffer was correctly assembled
assert.Equal(t, 6, buf.Len())
assert.Equal(t, []byte{1, 2, 3, 4, 5, 6}, buf.Bytes())
}

func Test_ParseUDPHeader(t *testing.T) {
// Test successful parsing of UDP header
// UDP header format: requestID (2 bytes) + seqNumber (2 bytes) + numDatagrams (2 bytes) + reserved (2 bytes)
headerData := []byte{
0x12, 0x34, // requestID = 0x1234
0x56, 0x78, // seqNumber = 0x5678
0x9A, 0xBC, // numDatagrams = 0x9ABC
0x00, 0x00, // reserved
}
buf := streambuf.NewFixed(headerData)
hdr, err := parseUDPHeader(buf)
assert.NoError(t, err)
assert.Equal(t, uint16(0x1234), hdr.requestID)
assert.Equal(t, uint16(0x5678), hdr.seqNumber)
assert.Equal(t, uint16(0x9ABC), hdr.numDatagrams)
}

func Test_ParseUDPHeaderInsufficientData(t *testing.T) {
// Test error handling when buffer is too short for Advance(2)
// Header needs 8 bytes total: 6 bytes for the three uint16s + 2 bytes for reserved
// This test uses only 6 bytes, so Advance(2) will fail
headerData := []byte{
0x12, 0x34, // requestID = 0x1234
0x56, 0x78, // seqNumber = 0x5678
0x9A, 0xBC, // numDatagrams = 0x9ABC
// Missing reserved 2 bytes - this will cause Advance(2) to fail
}
buf := streambuf.NewFixed(headerData)
hdr, err := parseUDPHeader(buf)
assert.Error(t, err)
// Header values should still be set from the reads before Advance failed
assert.Equal(t, uint16(0x1234), hdr.requestID)
assert.Equal(t, uint16(0x5678), hdr.seqNumber)
assert.Equal(t, uint16(0x9ABC), hdr.numDatagrams)
}