diff --git a/pkg/config/system_probe.go b/pkg/config/system_probe.go index d35d178c22196..7e2e07363854a 100644 --- a/pkg/config/system_probe.go +++ b/pkg/config/system_probe.go @@ -121,6 +121,8 @@ func InitSystemProbeConfig(cfg Config) { cfg.BindEnvAndSetDefault(join(spNS, "collect_dns_domains"), true, "DD_COLLECT_DNS_DOMAINS") cfg.BindEnvAndSetDefault(join(spNS, "max_dns_stats"), 20000) cfg.BindEnvAndSetDefault(join(spNS, "dns_timeout_in_s"), 15) + cfg.BindEnvAndSetDefault(join(spNS, "http_map_cleaner_interval_in_s"), 300) + cfg.BindEnvAndSetDefault(join(spNS, "http_idle_connection_ttl_in_s"), 30) cfg.BindEnvAndSetDefault(join(spNS, "enable_conntrack"), true) cfg.BindEnvAndSetDefault(join(spNS, "conntrack_max_state_size"), 65536*2) diff --git a/pkg/network/config/config.go b/pkg/network/config/config.go index d42dc51259611..a7689047c3b91 100644 --- a/pkg/network/config/config.go +++ b/pkg/network/config/config.go @@ -165,6 +165,12 @@ type Config struct { // EnableRootNetNs disables using the network namespace of the root process (1) // for things like creating netlink sockets for conntrack updates, etc. EnableRootNetNs bool + + // HTTPMapCleanerInterval is the interval to run the cleaner function. + HTTPMapCleanerInterval time.Duration + + // HTTPIdleConnectionTTL is the time an idle connection counted as "inactive" and should be deleted. + HTTPIdleConnectionTTL time.Duration } func join(pieces ...string) string { @@ -227,6 +233,9 @@ func New() *Config { RecordedQueryTypes: cfg.GetStringSlice(join(netNS, "dns_recorded_query_types")), EnableRootNetNs: cfg.GetBool(join(netNS, "enable_root_netns")), + + HTTPMapCleanerInterval: time.Duration(cfg.GetInt(join(spNS, "http_map_cleaner_interval_in_s"))) * time.Second, + HTTPIdleConnectionTTL: time.Duration(cfg.GetInt(join(spNS, "http_idle_connection_ttl_in_s"))) * time.Second, } if !cfg.IsSet(join(spNS, "max_closed_connections_buffered")) { diff --git a/pkg/network/http/ebpf_main.go b/pkg/network/http/ebpf_main.go index 2a0acc67c0abe..e88c3c3df1c7f 100644 --- a/pkg/network/http/ebpf_main.go +++ b/pkg/network/http/ebpf_main.go @@ -258,8 +258,8 @@ func (e *ebpfProgram) setupMapCleaner() { return } - ttl := maxRequestLinger.Nanoseconds() - httpMapCleaner.Clean(5*time.Minute, func(now int64, key, val interface{}) bool { + ttl := e.cfg.HTTPIdleConnectionTTL.Nanoseconds() + httpMapCleaner.Clean(e.cfg.HTTPMapCleanerInterval, func(now int64, key, val interface{}) bool { httpTX, ok := val.(*httpTX) if !ok { return false diff --git a/pkg/network/http/monitor_test.go b/pkg/network/http/monitor_test.go index 574b32ea98185..b715dd54b09c3 100644 --- a/pkg/network/http/monitor_test.go +++ b/pkg/network/http/monitor_test.go @@ -9,12 +9,17 @@ package http import ( + "bytes" "fmt" "io" "math/rand" "net" nethttp "net/http" "net/url" + "os" + "strconv" + "strings" + "sync" "testing" "time" @@ -26,12 +31,209 @@ import ( "github.com/DataDog/datadog-agent/pkg/util/kernel" ) -func TestHTTPMonitorIntegration(t *testing.T) { +const ( + kb = 1024 + mb = 1024 * kb +) + +var ( + emptyBody = []byte(nil) +) + +func skipTestIfKernelNotSupported(t *testing.T) { currKernelVersion, err := kernel.HostVersion() require.NoError(t, err) if currKernelVersion < kernel.VersionCode(4, 1, 0) { t.Skip("HTTP feature not available on pre 4.1.0 kernels") } +} + +// TestHTTPMonitorLoadWithIncompleteBuffers sends thousands of requests without getting responses for them, in parallel +// we send another request. We expect to capture the another request but not the incomplete requests. +func TestHTTPMonitorLoadWithIncompleteBuffers(t *testing.T) { + skipTestIfKernelNotSupported(t) + slowServerAddr := "localhost:8080" + fastServerAddr := "localhost:8081" + + slowSrvDoneFn := testutil.HTTPServer(t, slowServerAddr, testutil.Options{ + SlowResponse: time.Millisecond * 500, // Half a second. + WriteTimeout: time.Millisecond * 200, + ReadTimeout: time.Millisecond * 200, + }) + + fastSrvDoneFn := testutil.HTTPServer(t, fastServerAddr, testutil.Options{}) + + monitor, err := NewMonitor(config.New(), nil, nil) + require.NoError(t, err) + require.NoError(t, monitor.Start()) + defer monitor.Stop() + + abortedRequestFn := requestGenerator(t, fmt.Sprintf("%s/ignore", slowServerAddr), emptyBody) + wg := sync.WaitGroup{} + abortedRequests := make(chan *nethttp.Request, 100) + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + req := abortedRequestFn() + abortedRequests <- req + }() + } + fastReq := requestGenerator(t, fastServerAddr, emptyBody)() + wg.Wait() + close(abortedRequests) + slowSrvDoneFn() + fastSrvDoneFn() + + foundFastReq := false + // We are iterating for a couple of iterations and making sure the aborted requests will never be found. + // Since the every call for monitor.GetHTTPStats will delete the pop all entries, and we want to find fastReq + // then we are using a variable to check if "we ever found it" among the iterations. + for i := 0; i < 10; i++ { + time.Sleep(10 * time.Millisecond) + stats := monitor.GetHTTPStats() + for req := range abortedRequests { + requestNotIncluded(t, stats, req) + } + + foundFastReq = foundFastReq || isRequestIncluded(stats, fastReq) + } + + require.True(t, foundFastReq) +} + +func TestHTTPMonitorIntegrationWithResponseBody(t *testing.T) { + skipTestIfKernelNotSupported(t) + targetAddr := "localhost:8080" + serverAddr := "localhost:8080" + + tests := []struct { + name string + requestBodySize int + }{ + { + name: "no body", + requestBodySize: 0, + }, + { + name: "1kb body", + requestBodySize: 1 * kb, + }, + { + name: "10kb body", + requestBodySize: 10 * kb, + }, + { + name: "100kb body", + requestBodySize: 100 * kb, + }, + { + name: "500kb body", + requestBodySize: 500 * kb, + }, + { + name: "2mb body", + requestBodySize: 2 * mb, + }, + { + name: "10mb body", + requestBodySize: 10 * mb, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + srvDoneFn := testutil.HTTPServer(t, serverAddr, testutil.Options{ + EnableKeepAlives: true, + }) + + monitor, err := NewMonitor(config.New(), nil, nil) + require.NoError(t, err) + require.NoError(t, monitor.Start()) + defer monitor.Stop() + + requestFn := requestGenerator(t, targetAddr, bytes.Repeat([]byte("a"), tt.requestBodySize)) + var requests []*nethttp.Request + for i := 0; i < 100; i++ { + requests = append(requests, requestFn()) + } + srvDoneFn() + + assertAllRequestsExists(t, monitor, requests) + }) + } +} + +func TestHTTPMonitorIntegrationSlowResponse(t *testing.T) { + skipTestIfKernelNotSupported(t) + targetAddr := "localhost:8080" + serverAddr := "localhost:8080" + + tests := []struct { + name string + mapCleanerIntervalSeconds int + httpIdleConnectionTTLSeconds int + slowResponseTime int + shouldCapture bool + }{ + { + name: "response reaching after cleanup", + mapCleanerIntervalSeconds: 1, + httpIdleConnectionTTLSeconds: 1, + slowResponseTime: 3, + shouldCapture: false, + }, + { + name: "response reaching before cleanup", + mapCleanerIntervalSeconds: 1, + httpIdleConnectionTTLSeconds: 3, + slowResponseTime: 1, + shouldCapture: true, + }, + { + name: "slow response reaching after ttl but cleaner not running", + mapCleanerIntervalSeconds: 3, + httpIdleConnectionTTLSeconds: 1, + slowResponseTime: 2, + shouldCapture: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + os.Setenv("DD_SYSTEM_PROBE_CONFIG_HTTP_MAP_CLEANER_INTERVAL_IN_S", strconv.Itoa(tt.mapCleanerIntervalSeconds)) + os.Setenv("DD_SYSTEM_PROBE_CONFIG_HTTP_IDLE_CONNECTION_TTL_IN_S", strconv.Itoa(tt.httpIdleConnectionTTLSeconds)) + + slowResponseTimeout := time.Duration(tt.slowResponseTime) * time.Second + serverTimeout := slowResponseTimeout + time.Second + srvDoneFn := testutil.HTTPServer(t, serverAddr, testutil.Options{ + WriteTimeout: serverTimeout, + ReadTimeout: serverTimeout, + SlowResponse: slowResponseTimeout, + }) + + monitor, err := NewMonitor(config.New(), nil, nil) + require.NoError(t, err) + require.NoError(t, monitor.Start()) + defer monitor.Stop() + + // Perform a number of random requests + req := requestGenerator(t, targetAddr, emptyBody)() + srvDoneFn() + + // Ensure all captured transactions get sent to user-space + time.Sleep(10 * time.Millisecond) + stats := monitor.GetHTTPStats() + + if tt.shouldCapture { + includesRequest(t, stats, req) + } else { + requestNotIncluded(t, stats, req) + } + }) + } +} + +func TestHTTPMonitorIntegration(t *testing.T) { + skipTestIfKernelNotSupported(t) targetAddr := "localhost:8080" serverAddr := "localhost:8080" @@ -49,11 +251,7 @@ func TestHTTPMonitorIntegration(t *testing.T) { } func TestHTTPMonitorIntegrationWithNAT(t *testing.T) { - currKernelVersion, err := kernel.HostVersion() - require.NoError(t, err) - if currKernelVersion < kernel.VersionCode(4, 1, 0) { - t.Skip("HTTP feature not available on pre 4.1.0 kernels") - } + skipTestIfKernelNotSupported(t) // SetupDNAT sets up a NAT translation from 2.2.2.2 to 1.1.1.1 netlink.SetupDNAT(t) @@ -73,11 +271,7 @@ func TestHTTPMonitorIntegrationWithNAT(t *testing.T) { } func TestUnknownMethodRegression(t *testing.T) { - currKernelVersion, err := kernel.HostVersion() - require.NoError(t, err) - if currKernelVersion < kernel.VersionCode(4, 1, 0) { - t.Skip("HTTP feature not available on pre 4.1.0 kernels") - } + skipTestIfKernelNotSupported(t) // SetupDNAT sets up a NAT translation from 2.2.2.2 to 1.1.1.1 netlink.SetupDNAT(t) @@ -96,7 +290,7 @@ func TestUnknownMethodRegression(t *testing.T) { require.NoError(t, err) defer monitor.Stop() - requestFn := requestGenerator(t, targetAddr) + requestFn := requestGenerator(t, targetAddr, emptyBody) for i := 0; i < 100; i++ { requestFn() } @@ -119,11 +313,7 @@ func TestUnknownMethodRegression(t *testing.T) { } func TestRSTPacketRegression(t *testing.T) { - currKernelVersion, err := kernel.HostVersion() - require.NoError(t, err) - if currKernelVersion < kernel.VersionCode(4, 1, 0) { - t.Skip("HTTP feature not available on pre 4.1.0 kernels") - } + skipTestIfKernelNotSupported(t) monitor, err := NewMonitor(config.New(), nil, nil) require.NoError(t, err) @@ -161,6 +351,21 @@ func TestRSTPacketRegression(t *testing.T) { includesRequest(t, stats, &nethttp.Request{URL: url}) } +func assertAllRequestsExists(t *testing.T, monitor *Monitor, requests []*nethttp.Request) { + requestsExist := make([]bool, len(requests)) + for i := 0; i < 10; i++ { + time.Sleep(10 * time.Millisecond) + stats := monitor.GetHTTPStats() + for reqIndex, req := range requests { + requestsExist[reqIndex] = requestsExist[reqIndex] || isRequestIncluded(stats, req) + } + } + + for reqIndex, exists := range requestsExist { + require.Truef(t, exists, "request %d was not found (req %v)", reqIndex, requests[reqIndex]) + } +} + func testHTTPMonitor(t *testing.T, targetAddr, serverAddr string, numReqs int, o testutil.Options) { srvDoneFn := testutil.HTTPServer(t, serverAddr, o) @@ -171,7 +376,7 @@ func testHTTPMonitor(t *testing.T, targetAddr, serverAddr string, numReqs int, o defer monitor.Stop() // Perform a number of random requests - requestFn := requestGenerator(t, targetAddr) + requestFn := requestGenerator(t, targetAddr, emptyBody) var requests []*nethttp.Request for i := 0; i < numReqs; i++ { requests = append(requests, requestFn()) @@ -179,51 +384,85 @@ func testHTTPMonitor(t *testing.T, targetAddr, serverAddr string, numReqs int, o srvDoneFn() // Ensure all captured transactions get sent to user-space - time.Sleep(10 * time.Millisecond) - stats := monitor.GetHTTPStats() - - // Assert all requests made were correctly captured by the monitor - for _, req := range requests { - includesRequest(t, stats, req) - } + assertAllRequestsExists(t, monitor, requests) } -func requestGenerator(t *testing.T, targetAddr string) func() *nethttp.Request { +var ( + httpMethods = []string{nethttp.MethodGet, nethttp.MethodHead, nethttp.MethodPost, nethttp.MethodPut, nethttp.MethodPatch, nethttp.MethodDelete, nethttp.MethodOptions} + httpMethodsWithBody = []string{nethttp.MethodPost, nethttp.MethodPut, nethttp.MethodPatch, nethttp.MethodDelete} + statusCodes = []int{nethttp.StatusOK, nethttp.StatusMultipleChoices, nethttp.StatusBadRequest, nethttp.StatusInternalServerError} +) + +func requestGenerator(t *testing.T, targetAddr string, reqBody []byte) func() *nethttp.Request { var ( - methods = []string{"GET", "HEAD", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"} - statusCodes = []int{200, 300, 400, 500} - random = rand.New(rand.NewSource(time.Now().Unix())) - idx = 0 - client = new(nethttp.Client) + random = rand.New(rand.NewSource(time.Now().Unix())) + idx = 0 + client = new(nethttp.Client) ) return func() *nethttp.Request { idx++ - method := methods[random.Intn(len(methods))] + var method string + var body io.Reader + var finalBody []byte + if len(reqBody) > 0 { + finalBody = append([]byte(strings.Repeat(" ", idx)), reqBody...) + body = bytes.NewReader(finalBody) + method = httpMethodsWithBody[random.Intn(len(httpMethodsWithBody))] + } else { + method = httpMethods[random.Intn(len(httpMethods))] + } status := statusCodes[random.Intn(len(statusCodes))] url := fmt.Sprintf("http://%s/%d/request-%d", targetAddr, status, idx) - req, err := nethttp.NewRequest(method, url, nil) + req, err := nethttp.NewRequest(method, url, body) require.NoError(t, err) resp, err := client.Do(req) + if strings.Contains(targetAddr, "ignore") { + return req + } require.NoError(t, err) - resp.Body.Close() + if len(reqBody) > 0 { + respBody, err := io.ReadAll(resp.Body) + require.NoError(t, err) + defer resp.Body.Close() + require.Equal(t, finalBody, respBody) + } return req } } func includesRequest(t *testing.T, allStats map[Key]*RequestStats, req *nethttp.Request) { + if !isRequestIncluded(allStats, req) { + expectedStatus := testutil.StatusFromPath(req.URL.Path) + t.Errorf( + "could not find HTTP transaction matching the following criteria:\n path=%s method=%s status=%d", + req.URL.Path, + req.Method, + expectedStatus, + ) + } +} + +func requestNotIncluded(t *testing.T, allStats map[Key]*RequestStats, req *nethttp.Request) { + if isRequestIncluded(allStats, req) { + expectedStatus := testutil.StatusFromPath(req.URL.Path) + t.Errorf( + "should not find HTTP transaction matching the following criteria:\n path=%s method=%s status=%d", + req.URL.Path, + req.Method, + expectedStatus, + ) + } +} + +func isRequestIncluded(allStats map[Key]*RequestStats, req *nethttp.Request) bool { expectedStatus := testutil.StatusFromPath(req.URL.Path) for key, stats := range allStats { if key.Path.Content == req.URL.Path && stats.HasStats(expectedStatus) { - return + return true } } - t.Errorf( - "could not find HTTP transaction matching the following criteria:\n path=%s method=%s status=%d", - req.URL.Path, - req.Method, - expectedStatus, - ) + return false } diff --git a/pkg/network/http/testutil/testutil.go b/pkg/network/http/testutil/testutil.go index f7a69ad477d11..f575606f62213 100644 --- a/pkg/network/http/testutil/testutil.go +++ b/pkg/network/http/testutil/testutil.go @@ -9,7 +9,6 @@ import ( "context" "fmt" "io" - "io/ioutil" "net" "net/http" "os" @@ -26,6 +25,9 @@ import ( type Options struct { EnableTLS bool EnableKeepAlives bool + ReadTimeout time.Duration + WriteTimeout time.Duration + SlowResponse time.Duration } // HTTPServer spins up a HTTP test server that returns the status code included in the URL @@ -36,9 +38,15 @@ type Options struct { // nolint func HTTPServer(t *testing.T, addr string, options Options) func() { handler := func(w http.ResponseWriter, req *http.Request) { + if options.SlowResponse != 0 { + time.Sleep(options.SlowResponse) + } statusCode := StatusFromPath(req.URL.Path) - io.Copy(ioutil.Discard, req.Body) w.WriteHeader(statusCode) + + reqBody, _ := io.ReadAll(req.Body) + defer req.Body.Close() + w.Write(reqBody) } srv := &http.Server{ @@ -56,6 +64,13 @@ func HTTPServer(t *testing.T, addr string, options Options) func() { } return err } + if options.ReadTimeout != 0 { + srv.ReadTimeout = options.ReadTimeout + } + + if options.WriteTimeout != 0 { + srv.WriteTimeout = options.WriteTimeout + } // If certPath is set we enabled TLS if options.EnableTLS {