From 8c0e44c2a4ef2ba89d44f3527b46f3266da2f5f9 Mon Sep 17 00:00:00 2001 From: Guy Arbitman Date: Wed, 31 Aug 2022 18:47:26 +0100 Subject: [PATCH] add http monitor tests (#13114) * network: http: Changed the map clenaer interval to a configurable value with 5 minutes as default * network: http: tests: Added configurable read/write timeouts for the test server * network: http: tests: Added test for slow response This test checks the eBPF capturer with various scenarios for receiving a slow response from the test server and multiple IDLE http connection TTL and map cleaner running intervals * network: http: tests: Converted the test server into http echo server The change will allow us to send request body and get the same body in the response. It will allow us to have more scenarios to be tested against. * network: http: tests: Checking the eBPF capturing against scenarios in which we had request/response body We expect to have zero influence from the request/response body on the capturing process, no matter the size of the request/response. The tests here will examine this thesis against various body sizes * network: http: tests: Fixed TestHTTPMonitorLoadWithIncompleteBuffers Reduced number of requests during the load phase to 100. Closing the channel after all goroutines have ended (otherwise the loop on the channel will be blocked forever) Changed requestNotIncluded to includesRequest on the 'fast request' * network: http: tests: Fixed rebase error * network: http: tests: Made TestHTTPMonitorLoadWithIncompleteBuffers more robust * network: http: tests: Added more robust check * network: http: tests: Fixed flaky tests --- pkg/config/system_probe.go | 2 + pkg/network/config/config.go | 9 + pkg/network/http/ebpf_main.go | 4 +- pkg/network/http/monitor_test.go | 321 ++++++++++++++++++++++---- pkg/network/http/testutil/testutil.go | 19 +- 5 files changed, 310 insertions(+), 45 deletions(-) diff --git a/pkg/config/system_probe.go b/pkg/config/system_probe.go index d35d178c22196..7e2e07363854a 100644 --- a/pkg/config/system_probe.go +++ b/pkg/config/system_probe.go @@ -121,6 +121,8 @@ func InitSystemProbeConfig(cfg Config) { cfg.BindEnvAndSetDefault(join(spNS, "collect_dns_domains"), true, "DD_COLLECT_DNS_DOMAINS") cfg.BindEnvAndSetDefault(join(spNS, "max_dns_stats"), 20000) cfg.BindEnvAndSetDefault(join(spNS, "dns_timeout_in_s"), 15) + cfg.BindEnvAndSetDefault(join(spNS, "http_map_cleaner_interval_in_s"), 300) + cfg.BindEnvAndSetDefault(join(spNS, "http_idle_connection_ttl_in_s"), 30) cfg.BindEnvAndSetDefault(join(spNS, "enable_conntrack"), true) cfg.BindEnvAndSetDefault(join(spNS, "conntrack_max_state_size"), 65536*2) diff --git a/pkg/network/config/config.go b/pkg/network/config/config.go index d42dc51259611..a7689047c3b91 100644 --- a/pkg/network/config/config.go +++ b/pkg/network/config/config.go @@ -165,6 +165,12 @@ type Config struct { // EnableRootNetNs disables using the network namespace of the root process (1) // for things like creating netlink sockets for conntrack updates, etc. EnableRootNetNs bool + + // HTTPMapCleanerInterval is the interval to run the cleaner function. + HTTPMapCleanerInterval time.Duration + + // HTTPIdleConnectionTTL is the time an idle connection counted as "inactive" and should be deleted. + HTTPIdleConnectionTTL time.Duration } func join(pieces ...string) string { @@ -227,6 +233,9 @@ func New() *Config { RecordedQueryTypes: cfg.GetStringSlice(join(netNS, "dns_recorded_query_types")), EnableRootNetNs: cfg.GetBool(join(netNS, "enable_root_netns")), + + HTTPMapCleanerInterval: time.Duration(cfg.GetInt(join(spNS, "http_map_cleaner_interval_in_s"))) * time.Second, + HTTPIdleConnectionTTL: time.Duration(cfg.GetInt(join(spNS, "http_idle_connection_ttl_in_s"))) * time.Second, } if !cfg.IsSet(join(spNS, "max_closed_connections_buffered")) { diff --git a/pkg/network/http/ebpf_main.go b/pkg/network/http/ebpf_main.go index 2a0acc67c0abe..e88c3c3df1c7f 100644 --- a/pkg/network/http/ebpf_main.go +++ b/pkg/network/http/ebpf_main.go @@ -258,8 +258,8 @@ func (e *ebpfProgram) setupMapCleaner() { return } - ttl := maxRequestLinger.Nanoseconds() - httpMapCleaner.Clean(5*time.Minute, func(now int64, key, val interface{}) bool { + ttl := e.cfg.HTTPIdleConnectionTTL.Nanoseconds() + httpMapCleaner.Clean(e.cfg.HTTPMapCleanerInterval, func(now int64, key, val interface{}) bool { httpTX, ok := val.(*httpTX) if !ok { return false diff --git a/pkg/network/http/monitor_test.go b/pkg/network/http/monitor_test.go index 574b32ea98185..b715dd54b09c3 100644 --- a/pkg/network/http/monitor_test.go +++ b/pkg/network/http/monitor_test.go @@ -9,12 +9,17 @@ package http import ( + "bytes" "fmt" "io" "math/rand" "net" nethttp "net/http" "net/url" + "os" + "strconv" + "strings" + "sync" "testing" "time" @@ -26,12 +31,209 @@ import ( "github.com/DataDog/datadog-agent/pkg/util/kernel" ) -func TestHTTPMonitorIntegration(t *testing.T) { +const ( + kb = 1024 + mb = 1024 * kb +) + +var ( + emptyBody = []byte(nil) +) + +func skipTestIfKernelNotSupported(t *testing.T) { currKernelVersion, err := kernel.HostVersion() require.NoError(t, err) if currKernelVersion < kernel.VersionCode(4, 1, 0) { t.Skip("HTTP feature not available on pre 4.1.0 kernels") } +} + +// TestHTTPMonitorLoadWithIncompleteBuffers sends thousands of requests without getting responses for them, in parallel +// we send another request. We expect to capture the another request but not the incomplete requests. +func TestHTTPMonitorLoadWithIncompleteBuffers(t *testing.T) { + skipTestIfKernelNotSupported(t) + slowServerAddr := "localhost:8080" + fastServerAddr := "localhost:8081" + + slowSrvDoneFn := testutil.HTTPServer(t, slowServerAddr, testutil.Options{ + SlowResponse: time.Millisecond * 500, // Half a second. + WriteTimeout: time.Millisecond * 200, + ReadTimeout: time.Millisecond * 200, + }) + + fastSrvDoneFn := testutil.HTTPServer(t, fastServerAddr, testutil.Options{}) + + monitor, err := NewMonitor(config.New(), nil, nil) + require.NoError(t, err) + require.NoError(t, monitor.Start()) + defer monitor.Stop() + + abortedRequestFn := requestGenerator(t, fmt.Sprintf("%s/ignore", slowServerAddr), emptyBody) + wg := sync.WaitGroup{} + abortedRequests := make(chan *nethttp.Request, 100) + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + req := abortedRequestFn() + abortedRequests <- req + }() + } + fastReq := requestGenerator(t, fastServerAddr, emptyBody)() + wg.Wait() + close(abortedRequests) + slowSrvDoneFn() + fastSrvDoneFn() + + foundFastReq := false + // We are iterating for a couple of iterations and making sure the aborted requests will never be found. + // Since the every call for monitor.GetHTTPStats will delete the pop all entries, and we want to find fastReq + // then we are using a variable to check if "we ever found it" among the iterations. + for i := 0; i < 10; i++ { + time.Sleep(10 * time.Millisecond) + stats := monitor.GetHTTPStats() + for req := range abortedRequests { + requestNotIncluded(t, stats, req) + } + + foundFastReq = foundFastReq || isRequestIncluded(stats, fastReq) + } + + require.True(t, foundFastReq) +} + +func TestHTTPMonitorIntegrationWithResponseBody(t *testing.T) { + skipTestIfKernelNotSupported(t) + targetAddr := "localhost:8080" + serverAddr := "localhost:8080" + + tests := []struct { + name string + requestBodySize int + }{ + { + name: "no body", + requestBodySize: 0, + }, + { + name: "1kb body", + requestBodySize: 1 * kb, + }, + { + name: "10kb body", + requestBodySize: 10 * kb, + }, + { + name: "100kb body", + requestBodySize: 100 * kb, + }, + { + name: "500kb body", + requestBodySize: 500 * kb, + }, + { + name: "2mb body", + requestBodySize: 2 * mb, + }, + { + name: "10mb body", + requestBodySize: 10 * mb, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + srvDoneFn := testutil.HTTPServer(t, serverAddr, testutil.Options{ + EnableKeepAlives: true, + }) + + monitor, err := NewMonitor(config.New(), nil, nil) + require.NoError(t, err) + require.NoError(t, monitor.Start()) + defer monitor.Stop() + + requestFn := requestGenerator(t, targetAddr, bytes.Repeat([]byte("a"), tt.requestBodySize)) + var requests []*nethttp.Request + for i := 0; i < 100; i++ { + requests = append(requests, requestFn()) + } + srvDoneFn() + + assertAllRequestsExists(t, monitor, requests) + }) + } +} + +func TestHTTPMonitorIntegrationSlowResponse(t *testing.T) { + skipTestIfKernelNotSupported(t) + targetAddr := "localhost:8080" + serverAddr := "localhost:8080" + + tests := []struct { + name string + mapCleanerIntervalSeconds int + httpIdleConnectionTTLSeconds int + slowResponseTime int + shouldCapture bool + }{ + { + name: "response reaching after cleanup", + mapCleanerIntervalSeconds: 1, + httpIdleConnectionTTLSeconds: 1, + slowResponseTime: 3, + shouldCapture: false, + }, + { + name: "response reaching before cleanup", + mapCleanerIntervalSeconds: 1, + httpIdleConnectionTTLSeconds: 3, + slowResponseTime: 1, + shouldCapture: true, + }, + { + name: "slow response reaching after ttl but cleaner not running", + mapCleanerIntervalSeconds: 3, + httpIdleConnectionTTLSeconds: 1, + slowResponseTime: 2, + shouldCapture: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + os.Setenv("DD_SYSTEM_PROBE_CONFIG_HTTP_MAP_CLEANER_INTERVAL_IN_S", strconv.Itoa(tt.mapCleanerIntervalSeconds)) + os.Setenv("DD_SYSTEM_PROBE_CONFIG_HTTP_IDLE_CONNECTION_TTL_IN_S", strconv.Itoa(tt.httpIdleConnectionTTLSeconds)) + + slowResponseTimeout := time.Duration(tt.slowResponseTime) * time.Second + serverTimeout := slowResponseTimeout + time.Second + srvDoneFn := testutil.HTTPServer(t, serverAddr, testutil.Options{ + WriteTimeout: serverTimeout, + ReadTimeout: serverTimeout, + SlowResponse: slowResponseTimeout, + }) + + monitor, err := NewMonitor(config.New(), nil, nil) + require.NoError(t, err) + require.NoError(t, monitor.Start()) + defer monitor.Stop() + + // Perform a number of random requests + req := requestGenerator(t, targetAddr, emptyBody)() + srvDoneFn() + + // Ensure all captured transactions get sent to user-space + time.Sleep(10 * time.Millisecond) + stats := monitor.GetHTTPStats() + + if tt.shouldCapture { + includesRequest(t, stats, req) + } else { + requestNotIncluded(t, stats, req) + } + }) + } +} + +func TestHTTPMonitorIntegration(t *testing.T) { + skipTestIfKernelNotSupported(t) targetAddr := "localhost:8080" serverAddr := "localhost:8080" @@ -49,11 +251,7 @@ func TestHTTPMonitorIntegration(t *testing.T) { } func TestHTTPMonitorIntegrationWithNAT(t *testing.T) { - currKernelVersion, err := kernel.HostVersion() - require.NoError(t, err) - if currKernelVersion < kernel.VersionCode(4, 1, 0) { - t.Skip("HTTP feature not available on pre 4.1.0 kernels") - } + skipTestIfKernelNotSupported(t) // SetupDNAT sets up a NAT translation from 2.2.2.2 to 1.1.1.1 netlink.SetupDNAT(t) @@ -73,11 +271,7 @@ func TestHTTPMonitorIntegrationWithNAT(t *testing.T) { } func TestUnknownMethodRegression(t *testing.T) { - currKernelVersion, err := kernel.HostVersion() - require.NoError(t, err) - if currKernelVersion < kernel.VersionCode(4, 1, 0) { - t.Skip("HTTP feature not available on pre 4.1.0 kernels") - } + skipTestIfKernelNotSupported(t) // SetupDNAT sets up a NAT translation from 2.2.2.2 to 1.1.1.1 netlink.SetupDNAT(t) @@ -96,7 +290,7 @@ func TestUnknownMethodRegression(t *testing.T) { require.NoError(t, err) defer monitor.Stop() - requestFn := requestGenerator(t, targetAddr) + requestFn := requestGenerator(t, targetAddr, emptyBody) for i := 0; i < 100; i++ { requestFn() } @@ -119,11 +313,7 @@ func TestUnknownMethodRegression(t *testing.T) { } func TestRSTPacketRegression(t *testing.T) { - currKernelVersion, err := kernel.HostVersion() - require.NoError(t, err) - if currKernelVersion < kernel.VersionCode(4, 1, 0) { - t.Skip("HTTP feature not available on pre 4.1.0 kernels") - } + skipTestIfKernelNotSupported(t) monitor, err := NewMonitor(config.New(), nil, nil) require.NoError(t, err) @@ -161,6 +351,21 @@ func TestRSTPacketRegression(t *testing.T) { includesRequest(t, stats, &nethttp.Request{URL: url}) } +func assertAllRequestsExists(t *testing.T, monitor *Monitor, requests []*nethttp.Request) { + requestsExist := make([]bool, len(requests)) + for i := 0; i < 10; i++ { + time.Sleep(10 * time.Millisecond) + stats := monitor.GetHTTPStats() + for reqIndex, req := range requests { + requestsExist[reqIndex] = requestsExist[reqIndex] || isRequestIncluded(stats, req) + } + } + + for reqIndex, exists := range requestsExist { + require.Truef(t, exists, "request %d was not found (req %v)", reqIndex, requests[reqIndex]) + } +} + func testHTTPMonitor(t *testing.T, targetAddr, serverAddr string, numReqs int, o testutil.Options) { srvDoneFn := testutil.HTTPServer(t, serverAddr, o) @@ -171,7 +376,7 @@ func testHTTPMonitor(t *testing.T, targetAddr, serverAddr string, numReqs int, o defer monitor.Stop() // Perform a number of random requests - requestFn := requestGenerator(t, targetAddr) + requestFn := requestGenerator(t, targetAddr, emptyBody) var requests []*nethttp.Request for i := 0; i < numReqs; i++ { requests = append(requests, requestFn()) @@ -179,51 +384,85 @@ func testHTTPMonitor(t *testing.T, targetAddr, serverAddr string, numReqs int, o srvDoneFn() // Ensure all captured transactions get sent to user-space - time.Sleep(10 * time.Millisecond) - stats := monitor.GetHTTPStats() - - // Assert all requests made were correctly captured by the monitor - for _, req := range requests { - includesRequest(t, stats, req) - } + assertAllRequestsExists(t, monitor, requests) } -func requestGenerator(t *testing.T, targetAddr string) func() *nethttp.Request { +var ( + httpMethods = []string{nethttp.MethodGet, nethttp.MethodHead, nethttp.MethodPost, nethttp.MethodPut, nethttp.MethodPatch, nethttp.MethodDelete, nethttp.MethodOptions} + httpMethodsWithBody = []string{nethttp.MethodPost, nethttp.MethodPut, nethttp.MethodPatch, nethttp.MethodDelete} + statusCodes = []int{nethttp.StatusOK, nethttp.StatusMultipleChoices, nethttp.StatusBadRequest, nethttp.StatusInternalServerError} +) + +func requestGenerator(t *testing.T, targetAddr string, reqBody []byte) func() *nethttp.Request { var ( - methods = []string{"GET", "HEAD", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"} - statusCodes = []int{200, 300, 400, 500} - random = rand.New(rand.NewSource(time.Now().Unix())) - idx = 0 - client = new(nethttp.Client) + random = rand.New(rand.NewSource(time.Now().Unix())) + idx = 0 + client = new(nethttp.Client) ) return func() *nethttp.Request { idx++ - method := methods[random.Intn(len(methods))] + var method string + var body io.Reader + var finalBody []byte + if len(reqBody) > 0 { + finalBody = append([]byte(strings.Repeat(" ", idx)), reqBody...) + body = bytes.NewReader(finalBody) + method = httpMethodsWithBody[random.Intn(len(httpMethodsWithBody))] + } else { + method = httpMethods[random.Intn(len(httpMethods))] + } status := statusCodes[random.Intn(len(statusCodes))] url := fmt.Sprintf("http://%s/%d/request-%d", targetAddr, status, idx) - req, err := nethttp.NewRequest(method, url, nil) + req, err := nethttp.NewRequest(method, url, body) require.NoError(t, err) resp, err := client.Do(req) + if strings.Contains(targetAddr, "ignore") { + return req + } require.NoError(t, err) - resp.Body.Close() + if len(reqBody) > 0 { + respBody, err := io.ReadAll(resp.Body) + require.NoError(t, err) + defer resp.Body.Close() + require.Equal(t, finalBody, respBody) + } return req } } func includesRequest(t *testing.T, allStats map[Key]*RequestStats, req *nethttp.Request) { + if !isRequestIncluded(allStats, req) { + expectedStatus := testutil.StatusFromPath(req.URL.Path) + t.Errorf( + "could not find HTTP transaction matching the following criteria:\n path=%s method=%s status=%d", + req.URL.Path, + req.Method, + expectedStatus, + ) + } +} + +func requestNotIncluded(t *testing.T, allStats map[Key]*RequestStats, req *nethttp.Request) { + if isRequestIncluded(allStats, req) { + expectedStatus := testutil.StatusFromPath(req.URL.Path) + t.Errorf( + "should not find HTTP transaction matching the following criteria:\n path=%s method=%s status=%d", + req.URL.Path, + req.Method, + expectedStatus, + ) + } +} + +func isRequestIncluded(allStats map[Key]*RequestStats, req *nethttp.Request) bool { expectedStatus := testutil.StatusFromPath(req.URL.Path) for key, stats := range allStats { if key.Path.Content == req.URL.Path && stats.HasStats(expectedStatus) { - return + return true } } - t.Errorf( - "could not find HTTP transaction matching the following criteria:\n path=%s method=%s status=%d", - req.URL.Path, - req.Method, - expectedStatus, - ) + return false } diff --git a/pkg/network/http/testutil/testutil.go b/pkg/network/http/testutil/testutil.go index f7a69ad477d11..f575606f62213 100644 --- a/pkg/network/http/testutil/testutil.go +++ b/pkg/network/http/testutil/testutil.go @@ -9,7 +9,6 @@ import ( "context" "fmt" "io" - "io/ioutil" "net" "net/http" "os" @@ -26,6 +25,9 @@ import ( type Options struct { EnableTLS bool EnableKeepAlives bool + ReadTimeout time.Duration + WriteTimeout time.Duration + SlowResponse time.Duration } // HTTPServer spins up a HTTP test server that returns the status code included in the URL @@ -36,9 +38,15 @@ type Options struct { // nolint func HTTPServer(t *testing.T, addr string, options Options) func() { handler := func(w http.ResponseWriter, req *http.Request) { + if options.SlowResponse != 0 { + time.Sleep(options.SlowResponse) + } statusCode := StatusFromPath(req.URL.Path) - io.Copy(ioutil.Discard, req.Body) w.WriteHeader(statusCode) + + reqBody, _ := io.ReadAll(req.Body) + defer req.Body.Close() + w.Write(reqBody) } srv := &http.Server{ @@ -56,6 +64,13 @@ func HTTPServer(t *testing.T, addr string, options Options) func() { } return err } + if options.ReadTimeout != 0 { + srv.ReadTimeout = options.ReadTimeout + } + + if options.WriteTimeout != 0 { + srv.WriteTimeout = options.WriteTimeout + } // If certPath is set we enabled TLS if options.EnableTLS {