From 393d4d64f5e0fced62d6d22783f27f8cd1165ae7 Mon Sep 17 00:00:00 2001 From: Matteo Pace Date: Thu, 4 Jan 2024 23:53:51 +0100 Subject: [PATCH] fix: chunk bodies and process partial (#252) --- .github/workflows/ci.yaml | 6 +-- go.mod | 6 +-- go.sum | 17 +++--- main_test.go | 50 ++++++++++++++--- wasmplugin/plugin.go | 111 +++++++++++++++++++++++--------------- 5 files changed, 121 insertions(+), 69 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index a006dff..57af61d 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -16,11 +16,11 @@ env: TINYGO_VERSION: 0.30.0 # Run e2e tests against latest two releases and latest dev ENVOY_IMAGES: > + envoyproxy/envoy:v1.28-latest envoyproxy/envoy:v1.27-latest - envoyproxy/envoy:v1.26-latest envoyproxy/envoy-dev:latest - istio/proxyv2:1.18.2 - istio/proxyv2:1.19.0 + istio/proxyv2:1.20.1 + istio/proxyv2:1.19.5 jobs: build: diff --git a/go.mod b/go.mod index 40d99cd..fc10b62 100644 --- a/go.mod +++ b/go.mod @@ -5,8 +5,8 @@ go 1.20 require ( github.com/corazawaf/coraza-wasilibs v0.1.0 github.com/corazawaf/coraza/v3 v3.0.4 - github.com/stretchr/testify v1.8.0 - github.com/tetratelabs/proxy-wasm-go-sdk v0.22.0 + github.com/stretchr/testify v1.8.4 + github.com/tetratelabs/proxy-wasm-go-sdk v0.22.1-0.20240102162926-b089ccb94219 github.com/tidwall/gjson v1.17.0 github.com/wasilibs/nottinygc v0.7.0 ) @@ -18,7 +18,7 @@ require ( github.com/magefile/mage v1.15.0 // indirect github.com/petar-dambovaliev/aho-corasick v0.0.0-20230725210150-fb29fc3c913e // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/tetratelabs/wazero v1.5.0 // indirect + github.com/tetratelabs/wazero v1.6.0 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.1 // indirect github.com/wasilibs/go-aho-corasick v0.5.0 // indirect diff --git a/go.sum b/go.sum index 44bc5ca..5da9553 100644 --- a/go.sum +++ b/go.sum @@ -4,7 +4,6 @@ github.com/corazawaf/coraza/v3 v3.0.4 h1:Llemgoh0hp2NggCwcWN8lNiV4Pfe+AWzf1oEcas github.com/corazawaf/coraza/v3 v3.0.4/go.mod h1:3fTYjY5BZv3nezLpH6NAap0gr3jZfbQWUAu2GF17ET4= github.com/corazawaf/libinjection-go v0.1.2 h1:oeiV9pc5rvJ+2oqOqXEAMJousPpGiup6f7Y3nZj5GoM= github.com/corazawaf/libinjection-go v0.1.2/go.mod h1:OP4TM7xdJ2skyXqNX1AN1wN5nNZEmJNuWbNPOItn7aw= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/foxcpp/go-mockdns v1.0.0 h1:7jBqxd3WDWwi/6WhDvacvH1XsN3rOLXyHM1uhvIx6FI= @@ -20,15 +19,12 @@ github.com/petar-dambovaliev/aho-corasick v0.0.0-20230725210150-fb29fc3c913e h1: github.com/petar-dambovaliev/aho-corasick v0.0.0-20230725210150-fb29fc3c913e/go.mod h1:EHPiTAKtiFmrMldLUNswFwfZ2eJIYBHktdaUTZxYWRw= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/tetratelabs/proxy-wasm-go-sdk v0.22.0 h1:kS7BvMKN+FiptV4pfwiNX8e3q14evxAWkhYbxt8EI1M= -github.com/tetratelabs/proxy-wasm-go-sdk v0.22.0/go.mod h1:qkW5MBz2jch2u8bS59wws65WC+Gtx3x0aPUX5JL7CXI= -github.com/tetratelabs/wazero v1.5.0 h1:Yz3fZHivfDiZFUXnWMPUoiW7s8tC1sjdBtlJn08qYa0= -github.com/tetratelabs/wazero v1.5.0/go.mod h1:0U0G41+ochRKoPKCJlh0jMg1CHkyfK8kDqiirMmKY8A= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/tetratelabs/proxy-wasm-go-sdk v0.22.1-0.20240102162926-b089ccb94219 h1:lBbCzjjZJl2+deUwrRv5+GQle18AkCoUkFwL8sPVvIE= +github.com/tetratelabs/proxy-wasm-go-sdk v0.22.1-0.20240102162926-b089ccb94219/go.mod h1:YqR8JZaY3Ev9ihXgjzAQAMkXEzPKKmy4Q5rsVWt4XGk= +github.com/tetratelabs/wazero v1.6.0 h1:z0H1iikCdP8t+q341xqepY4EWvHEw8Es7tlqiVzlP3g= +github.com/tetratelabs/wazero v1.6.0/go.mod h1:0U0G41+ochRKoPKCJlh0jMg1CHkyfK8kDqiirMmKY8A= github.com/tidwall/gjson v1.17.0 h1:/Jocvlh98kcTfpN2+JzGQWQcqrPQwDrVEMApx/M5ZwM= github.com/tidwall/gjson v1.17.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= @@ -54,7 +50,6 @@ golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= rsc.io/binaryregexp v0.2.0 h1:HfqmD5MEmC0zvwBuF187nq9mdnXjXsSivRiXN7SmRkE= diff --git a/main_test.go b/main_test.go index f311225..441ddf6 100644 --- a/main_test.go +++ b/main_test.go @@ -9,6 +9,8 @@ import ( "fmt" "os" "path/filepath" + "regexp" + "strconv" "strings" "testing" @@ -229,8 +231,8 @@ func TestLifecycle(t *testing.T) { { name: "request body accepted, no request body access", inlineRules: ` - SecRuleEngine On\nSecRequestBodyAccess Off\nSecRule REQUEST_BODY \"animal=bear\" \"id:101,phase:2,t:lowercase,deny\" - `, + SecRuleEngine On\nSecRequestBodyAccess Off\nSecRule REQUEST_BODY \"animal=bear\" \"id:101,phase:2,t:lowercase,deny\" + `, requestHdrsAction: types.ActionContinue, requestBodyAction: types.ActionContinue, responseHdrsAction: types.ActionContinue, @@ -240,8 +242,8 @@ func TestLifecycle(t *testing.T) { { name: "request body accepted, payload above process partial", inlineRules: ` - SecRuleEngine On\nSecRequestBodyAccess On\nSecRequestBodyLimit 2\nSecRequestBodyLimitAction ProcessPartial\nSecRule REQUEST_BODY \"animal=bear\" \"id:101,phase:2,t:lowercase,deny\" - `, + SecRuleEngine On\nSecRequestBodyAccess On\nSecRequestBodyLimit 2\nSecRequestBodyLimitAction ProcessPartial\nSecRule REQUEST_BODY \"animal=bear\" \"id:101,phase:2,t:lowercase,deny\" + `, requestHdrsAction: types.ActionContinue, requestBodyAction: types.ActionContinue, responseHdrsAction: types.ActionContinue, @@ -450,6 +452,16 @@ func TestLifecycle(t *testing.T) { // Stream bodies in chunks of 5 if requestHdrsAction == types.ActionContinue { + totalBodysent := 0 + requestBodyAccess := strings.Contains(tt.inlineRules, "SecRequestBodyAccess On") + requestBodyProcessPartial := strings.Contains(tt.inlineRules, "SecRequestBodyLimitAction ProcessPartial") + var requestBodyLimit int + matches := regexp.MustCompile(`SecRequestBodyLimit (\d+)`).FindStringSubmatch(tt.inlineRules) + if len(matches) > 1 { + var err error + requestBodyLimit, err = strconv.Atoi(matches[1]) + require.NoError(t, err) + } for i := 0; i < len(reqBody); i += 5 { eos := i+5 >= len(reqBody) var body []byte @@ -458,13 +470,20 @@ func TestLifecycle(t *testing.T) { } else { body = reqBody[i : i+5] } + totalBodysent += len(body) requestBodyAction = host.CallOnRequestBody(id, body, eos) - requestBodyAccess := strings.Contains(tt.inlineRules, "SecRequestBodyAccess On") switch { case eos: requireEqualAction(t, tt.requestBodyAction, requestBodyAction, "unexpected body action, want %q, have %q on end of stream") - case requestBodyAccess: + // Reject: We expect pause in all cases with action Reject: being the limit reached or not + case requestBodyAccess && !requestBodyProcessPartial: + requireEqualAction(t, types.ActionPause, requestBodyAction, "unexpected request body action, want %q, have %q") + // ProcessPartial: we expect pause until the limit is reached + case requestBodyAccess && requestBodyProcessPartial && totalBodysent < requestBodyLimit: requireEqualAction(t, types.ActionPause, requestBodyAction, "unexpected request body action, want %q, have %q") + // ProcessPartial: we expect tt.requestBodyAction when the limit is reached + case requestBodyAccess && requestBodyProcessPartial && totalBodysent >= requestBodyLimit: + requireEqualAction(t, tt.requestBodyAction, requestBodyAction, "unexpected request body action, want %q, have %q") default: requireEqualAction(t, types.ActionContinue, requestBodyAction, "unexpected request body action, want %q, have %q") } @@ -478,6 +497,15 @@ func TestLifecycle(t *testing.T) { if responseHdrsAction == types.ActionContinue { responseBodyAccess := strings.Contains(tt.inlineRules, "SecResponseBodyAccess On") + responseBodyProcessPartial := strings.Contains(tt.inlineRules, "SecResponseBodyLimitAction ProcessPartial") + var responseBodyLimit int + matches := regexp.MustCompile(`SecResponseBodyLimit (\d+)`).FindStringSubmatch(tt.inlineRules) + if len(matches) > 1 { + var err error + responseBodyLimit, err = strconv.Atoi(matches[1]) + require.NoError(t, err) + } + totalBodysent := 0 for i := 0; i < len(respBody); i += 5 { eos := i+5 >= len(respBody) var body []byte @@ -486,6 +514,7 @@ func TestLifecycle(t *testing.T) { } else { body = respBody[i : i+5] } + totalBodysent += len(body) responseBodyAction := host.CallOnResponseBody(id, body, eos) switch { // expectResponseRejectLimitActionSinceFirstChunk: writing the first chunk (len(respBody) bytes), it is expected to reach @@ -493,8 +522,13 @@ func TestLifecycle(t *testing.T) { // with the interruption enforced replacing the body with null bytes (checked with tt.respondedNullBody) case eos, tt.expectResponseRejectSinceFirstChunk: requireEqualAction(t, types.ActionContinue, responseBodyAction, "unexpected response body action, want %q, have %q on end of stream") - case responseBodyAccess: - requireEqualAction(t, types.ActionPause, responseBodyAction, "unexpected response body action, want %q, have %q") + // Reject: We expect pause in all cases with action Reject: being the limit reached or not + // It would either be paused because we are callectin the body or because the limit was reached and we triggered the action + case responseBodyAccess && !responseBodyProcessPartial: + requireEqualAction(t, types.ActionPause, responseBodyAction, "unexpected request body action, want %q, have %q") + // ProcessPartial: we expect pause until the limit is reached + case responseBodyAccess && responseBodyProcessPartial && totalBodysent < responseBodyLimit: + requireEqualAction(t, types.ActionPause, responseBodyAction, "unexpected request body action, want %q, have %q") default: requireEqualAction(t, types.ActionContinue, responseBodyAction, "unexpected response body action, want %q, have %q") } diff --git a/wasmplugin/plugin.go b/wasmplugin/plugin.go index 951a11f..e2fae78 100644 --- a/wasmplugin/plugin.go +++ b/wasmplugin/plugin.go @@ -384,36 +384,42 @@ func (ctx *httpContext) OnHttpRequestBody(bodySize int, endOfStream bool) types. return types.ActionContinue } - if bodySize > 0 { - b, err := proxywasm.GetHttpRequestBody(ctx.bodyReadIndex, bodySize) - if err == nil { - interruption, _, err := tx.WriteRequestBody(b) - if err != nil { - ctx.logger.Error().Err(err).Msg("Failed to write request body") - return types.ActionContinue - } - - if interruption != nil { - return ctx.handleInterruption(interruptionPhaseHttpRequestBody, interruption) - } - - ctx.bodyReadIndex += bodySize - } else if err != types.ErrorStatusNotFound { - // When using FWT sometimes (it is inconsistent) we receive calls where ctx.bodyReadIndex == bodySize - // meaning that the incoming size in the body is the same as the already read body. - // When that happens, this code fails to retrieve the body through proxywasm.GetHttpRequestBody - // as the total body is from 0 up to X bytes and since the last bodySize = X it attempts to read - // from X up to X bytes and it returns a types.ErrorStatusNotFound. This could happen despite - // endOfStream being true or false. - // The tests in 920410 show this problem. - // TODO(jcchavezs): Verify if this is a FTW problem. - ctx.logger.Error(). - Err(err). - Int("body_read_index", ctx.bodyReadIndex). + // bodySize is the size of the whole body received so far, not the size of the current chunk + chunkSize := bodySize - ctx.bodyReadIndex + // OnHttpRequestBody might be called more than once with the same data, we check if there is new data available to be read + if chunkSize > 0 { + bodyChunk, err := proxywasm.GetHttpRequestBody(ctx.bodyReadIndex, chunkSize) + if err != nil { + ctx.logger.Error().Err(err). Int("body_size", bodySize). + Int("body_read_index", ctx.bodyReadIndex). + Int("chunk_size", chunkSize). Msg("Failed to read request body") return types.ActionContinue } + readchunkSize := len(bodyChunk) + if readchunkSize != chunkSize { + ctx.logger.Warn().Int("read_chunk_size", readchunkSize).Int("chunk_size", chunkSize).Msg("Request chunk size read is different from the computed one") + } + interruption, writtenBytes, err := tx.WriteRequestBody(bodyChunk) + if err != nil { + ctx.logger.Error().Err(err).Msg("Failed to write request body") + return types.ActionContinue + } + if interruption != nil { + return ctx.handleInterruption(interruptionPhaseHttpRequestBody, interruption) + } + + // If not the whole chunk has been written, it implicitly means that we reached the waf request body limit. + // Internally ProcessRequestBody has been called and it did not raise any interruption (just checked in the condition above). + if writtenBytes < readchunkSize { + // No further body data will be processed + // Setting processedRequestBody avoid to call more than once ProcessRequestBody + ctx.processedRequestBody = true + return types.ActionContinue + } + + ctx.bodyReadIndex += readchunkSize } if endOfStream { @@ -531,6 +537,10 @@ func (ctx *httpContext) OnHttpResponseBody(bodySize int, endOfStream bool) types return replaceResponseBodyWhenInterrupted(ctx.logger, bodySize) } + if ctx.processedResponseBody { + return types.ActionContinue + } + if ctx.tx == nil { return types.ActionContinue } @@ -562,33 +572,46 @@ func (ctx *httpContext) OnHttpResponseBody(bodySize int, endOfStream bool) types return types.ActionContinue } - if bodySize > 0 { - body, err := proxywasm.GetHttpResponseBody(ctx.bodyReadIndex, bodySize) - if err == nil { - interruption, _, err := tx.WriteResponseBody(body) - if err != nil { - ctx.logger.Error().Err(err).Msg("Failed to write response body") - return types.ActionContinue - } - // bodyReadIndex has to be updated before evaluating the interruption - // it is internally needed to replace the full body if the tx is interrupted - ctx.bodyReadIndex += bodySize - if interruption != nil { - return ctx.handleInterruption(interruptionPhaseHttpResponseBody, interruption) - } - } else if err != types.ErrorStatusNotFound { + chunkSize := bodySize - ctx.bodyReadIndex + if chunkSize > 0 { + bodyChunk, err := proxywasm.GetHttpResponseBody(ctx.bodyReadIndex, chunkSize) + if err != nil { ctx.logger.Error(). - Int("body_read_index", ctx.bodyReadIndex). Int("body_size", bodySize). + Int("body_read_index", ctx.bodyReadIndex). + Int("chunk_size", chunkSize). Err(err). Msg("Failed to read response body") return types.ActionContinue } + + readchunkSize := len(bodyChunk) + if readchunkSize != chunkSize { + ctx.logger.Warn().Int("read_chunk_size", readchunkSize).Int("chunk_size", chunkSize).Msg("Response chunk size read is different from the computed one") + } + interruption, writtenBytes, err := tx.WriteResponseBody(bodyChunk) + if err != nil { + ctx.logger.Error().Err(err).Msg("Failed to write response body") + return types.ActionContinue + } + // bodyReadIndex has to be updated before evaluating the interruption + // it is internally needed to replace the full body if the transaction is interrupted + ctx.bodyReadIndex += readchunkSize + if interruption != nil { + return ctx.handleInterruption(interruptionPhaseHttpResponseBody, interruption) + } + // If not the whole chunk has been written, it implicitly means that we reached the waf response body limit, + // internally ProcessResponseBody has been called and it did not raise any interruption (just checked in the condition above). + if writtenBytes < readchunkSize { + // no further body data will be processed + ctx.processedResponseBody = true + return types.ActionContinue + } } if endOfStream { // We have already sent response headers, an unauthorized response can not be sent anymore, - // but we can still drop the response to prevent leaking sensitive content. + // but we can still drop the response body to prevent leaking sensitive content. // The error will also be logged by Coraza. ctx.processedResponseBody = true interruption, err := tx.ProcessResponseBody() @@ -604,7 +627,7 @@ func (ctx *httpContext) OnHttpResponseBody(bodySize int, endOfStream bool) types return types.ActionContinue } // Wait until we see the entire body. It has to be buffered in order to check that it is fully legit - // before sending it downstream + // before sending it downstream (to the client) return types.ActionPause }