diff --git a/jar/jar.go b/jar/jar.go index 9f6a1c6..7962403 100644 --- a/jar/jar.go +++ b/jar/jar.go @@ -29,6 +29,7 @@ import ( "strings" "sync" + "github.com/google/log4jscanner/pool" zipfork "github.com/google/log4jscanner/third_party/zip" "rsc.io/binaryregexp" ) @@ -255,10 +256,9 @@ var ( return make([]byte, bufSize) }, } - dynBufPool = sync.Pool{ - New: func() interface{} { - return make([]byte, bufSize) - }, + dynBufPool = pool.Dynamic{ + Pool: sync.Pool{New: func() interface{} { return make([]byte, 0) }}, + MinUtility: bufSize, } ) @@ -394,7 +394,7 @@ func (c *checker) checkFile(zf *zip.File, depth int, size int64, jar string) err } buf := dynBufPool.Get().([]byte) buf, err = readFull(f, fi, buf) - defer dynBufPool.Put(buf) + defer dynBufPool.Put(buf, float64(len(buf)), float64(cap(buf))) f.Close() // Recycle the flate buffer earlier, we're going to recurse. if err != nil { return fmt.Errorf("read file %s: %v", p, err) @@ -419,7 +419,11 @@ func readFull(r io.Reader, fi os.FileInfo, buf []byte) ([]byte, error) { return io.ReadAll(r) // If not a regular file, size may not be accurate. } if size := int(fi.Size()); cap(buf) < size { - buf = make([]byte, size) + capacity := size + if capacity < bufSize { + capacity = bufSize // Allocating much smaller buffers could lead to quick re-allocations. + } + buf = make([]byte, size, capacity) } else { buf = buf[:size] } diff --git a/pool/dynamic.go b/pool/dynamic.go new file mode 100644 index 0000000..14d61a5 --- /dev/null +++ b/pool/dynamic.go @@ -0,0 +1,67 @@ +// Package pool provides an object pool that trades off the cost of creation +// versus retention. It is meant to avoid the pessimal behaviour (see [issue +// 23199]) seen when using a regular sync.Pool with objects of dynamic sizes; +// objects that are too large are kept alive by repeat usages that don't need +// such sizes. +// +// [issue 23199]: https://github.com/golang/go/issues/23199 +package pool + +import ( + "math" + "sync" + "sync/atomic" +) + +// A Dynamic pool is like a sync.Pool for objects of varying sizes. +// +// It prevents the indefinite retention of (too) large objects by keeping a +// history of required object sizes (utility) and comparing them to the actual +// object size (cost) before accepting an object. +type Dynamic struct { + Pool sync.Pool + + // The utility below which the cost of creating the object is more expensive + // than just keeping it. Set this to the expected object size (or perhaps a + // bit larger to reduce allocations more). + MinUtility float64 + + avgUtility uint64 // Actually a float64, but that type does not have atomic ops. +} + +func (p *Dynamic) Get() interface{} { + return p.Pool.Get() +} + +// Put is like sync.Pool.Put, with a few differences. The utility is a measure +// of what part of the object was actually used. The cost is a measure of the +// total "size" of the object. Utility must be smaller than or equal to cost. +func (p *Dynamic) Put(v interface{}, utility, cost float64) bool { + // Update the average utility. Uses atomic load/store, which means that + // values can get lost if Put is called concurrently. That's fine, we're + // just looking for an approximate (weighted) moving average. + avgUtility := math.Float64frombits(atomic.LoadUint64(&p.avgUtility)) + avgUtility = decay(avgUtility, utility, p.MinUtility) + atomic.StoreUint64(&p.avgUtility, math.Float64bits(avgUtility)) + + if cost > 10*avgUtility { + return false // If the cost is 10x larger than the average utility, drop it. + } + p.Pool.Put(v) + return true +} + +// decay updates returns `val` if `val > `prev`, otherwise it returns an +// exponentially moving average of `prev` and `val` (with factor 0.5. This is +// meant to provide a slower downramp if `val` drops ever lower. The minimum +// value is `min`. +func decay(prev, val, min float64) float64 { + if val < min { + val = min + } + if prev == 0 || val > prev { + return val + } + const factor = 0.5 + return (prev * factor) + (val * (1 - factor)) +} diff --git a/pool/dynamic_test.go b/pool/dynamic_test.go new file mode 100644 index 0000000..32dbc26 --- /dev/null +++ b/pool/dynamic_test.go @@ -0,0 +1,80 @@ +package pool + +import ( + "math" + "sync" + "sync/atomic" + "testing" +) + +const bufSize = 4096 + +// The desired behaviour of the dynamic (buffer) pool is: +// - Don't retain (very) large items indefinitely (check that one is rejected +// at least once). +// - Do retain even large items for a while so their allocation cost is +// amortized. +func TestDynamic(t *testing.T) { + dp := Dynamic{ + Pool: sync.Pool{New: func() interface{} { return make([]byte, 0) }}, + MinUtility: bufSize, + } + var allocs int + // Simulate a sequence of file sizes. This sequence is not based on some + // real-life observed sequence of sizes of jar-in-jars. It might be better + // to use such a sequence, but every organisation will have its own expected + // sizes and this synthetic one conains some fairly extreme samples that + // check whether the algorithm is robust. + // + // For the current algorithm, the worst possible sequence is one that + // rises, then suddenly drops and then rises slowly again. We contend that + // this case is rare. + sizes := [...]int{ + 100000, 1, 1, 1, 1, 1, 10, 1, 1, 1, 1, 1, 1, 1, 12, 1, 1, 1, 1, 1, 1, 1, + 1000, 100, 10000, 100000, 1, 100000, 1, 50000, 1, 1, 25000, 1, 1, 1, + 100000, 1, 1, 1, 1, 1, 1, 1, 1, 1, 100, 100, 100, 1, 1, 1, 1, 1, 100, + 200, 300, 100, 50, 50, 50, 50, 50, 1, 1, 1, 1, 100000000, 1000000, + 100000, 10000, 1000, 100, 10, 1, 1, 500, 2020, 400, 3984, 5, 200, 500, + 40000, 35000, 45000, 42000, 38000, 38000, 39000, 41000, 42000, 42000, // Average: 40000 + 2000, 4000, 3949, 2011, 4096, 33, 0, 4938, 1, 1, 1200, 2400, 1200, 200, + 400, 600, 700, 100, 400, 500, 700, 600, 900, 1000, 1100, 1200, 1000, + } + + var largeBufferPurged int + + t.Logf("num allocs value target capacity") + // This test assumes (with some margin for error) that back-to-back Put/Get + // on a pool from a single goroutine yield the same item. I believe this to + // be a fairly stable assumption avoiding plenty of testing boilerplate, + // time will tell. + for idx, size := range sizes { + buf := dp.Get().([]byte) + if cap(buf) < size { + capacity := size + if capacity < bufSize { + capacity = bufSize // Allocating much smaller buffers could lead to quick re-allocations. + } + buf = make([]byte, size, capacity) + allocs++ + } else { + buf = buf[:size] + } + utility := float64(len(buf)) + if utility < bufSize { + utility = bufSize + } + if !dp.Put(buf, utility, float64(cap(buf))) && cap(buf) >= 100000 { + largeBufferPurged++ + } + avgUtility := math.Float64frombits(atomic.LoadUint64(&dp.avgUtility)) + t.Logf("%d %d %d %f %d", idx+1, allocs, size, avgUtility, cap(buf)) + } + // Before the amortized buffer optimization, each iteration would've been + // one allocation. We want at least 10x fewer than that. + if got, want := allocs, len(sizes)/10; got > want { + t.Errorf("got %d allocations, wanted %d", got, want) + } + if got, atLeast := largeBufferPurged, 2; got < atLeast { + t.Errorf("buffers >= 100000 have been rejected %d times, expected at least %d", got, atLeast) + } +}