Skip to content

Commit

Permalink
feat(chore): storage handle fresh/stale at once with multi layer storage
Browse files Browse the repository at this point in the history
  • Loading branch information
darkweak committed Feb 20, 2024
1 parent 6b127c4 commit 822f12b
Show file tree
Hide file tree
Showing 10 changed files with 479 additions and 6 deletions.
19 changes: 14 additions & 5 deletions pkg/middleware/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func (s *SouinBaseHandler) Store(
// "Implies that the response is uncacheable"
status += "; detail=UPSTREAM-VARY-STAR"
} else {
cachedKey += rfc.GetVariedCacheKey(rq, variedHeaders)
variedKey := cachedKey + rfc.GetVariedCacheKey(rq, variedHeaders)
s.Configuration.GetLogger().Sugar().Debugf("Store the response %+v with duration %v", res, ma)

var wg sync.WaitGroup
Expand All @@ -267,12 +267,16 @@ func (s *SouinBaseHandler) Store(
case <-rq.Context().Done():
status += "; detail=REQUEST-CANCELED-OR-UPSTREAM-BROKEN-PIPE"
default:
vhs := http.Header{}
for _, hname := range variedHeaders {
vhs.Set(hname, rq.Header.Get(hname))
}
for _, storer := range s.Storers {
wg.Add(1)
go func(currentStorer types.Storer) {
defer wg.Done()
if currentStorer.Set(cachedKey, response, currentMatchedURL, ma) == nil {
s.Configuration.GetLogger().Sugar().Debugf("Stored the key %s in the %s provider", cachedKey, currentStorer.Name())
if currentStorer.SetMultiLevel(cachedKey, cachedKey, response, vhs, rq.Header.Get("Etag"), ma) == nil {
s.Configuration.GetLogger().Sugar().Debugf("Stored the key %s in the %s provider", variedKey, currentStorer.Name())
} else {
mu.Lock()
fails = append(fails, fmt.Sprintf("; detail=%s-INSERTION-ERROR", currentStorer.Name()))
Expand All @@ -285,7 +289,7 @@ func (s *SouinBaseHandler) Store(
if len(fails) < s.storersLen {
go func(rs http.Response, key string) {
_ = s.SurrogateKeyStorer.Store(&rs, key)
}(res, cachedKey)
}(res, variedKey)
status += "; stored"
}

Expand Down Expand Up @@ -530,7 +534,12 @@ func (s *SouinBaseHandler) ServeHTTP(rw http.ResponseWriter, rq *http.Request, n
validator := rfc.ParseRequest(req)
var response *http.Response
for _, currentStorer := range s.Storers {
response = currentStorer.Prefix(cachedKey, req, validator)
fresh, stale := currentStorer.GetMultiLevel(cachedKey, req, validator)

response = fresh
if fresh == nil {
response = stale
}
if response != nil {
s.Configuration.GetLogger().Sugar().Debugf("Found response in the %s storage", currentStorer.Name())
break
Expand Down
7 changes: 6 additions & 1 deletion pkg/rfc/revalidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ func ParseRequest(req *http.Request) *Revalidator {
}

func ValidateETag(res *http.Response, validator *Revalidator) {
validator.ResponseETag = res.Header.Get("ETag")
ValidateETagFromHeader(res.Header.Get("ETag"), validator)

}

func ValidateETagFromHeader(etag string, validator *Revalidator) {
validator.ResponseETag = etag
validator.NeedRevalidation = validator.NeedRevalidation || validator.ResponseETag != ""
validator.Matched = validator.ResponseETag == "" || (validator.ResponseETag != "" && len(validator.RequestETags) == 0)

Expand Down
73 changes: 73 additions & 0 deletions pkg/storage/badgerProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"bytes"
"encoding/json"
"errors"
"net/http"
"regexp"
"strings"
Expand Down Expand Up @@ -200,6 +201,78 @@ func (provider *Badger) Prefix(key string, req *http.Request, validator *rfc.Rev
return result
}

// GetMultiLevel tries to load the key and check if one of linked keys is a fresh/stale candidate.
func (provider *Badger) GetMultiLevel(key string, req *http.Request, validator *rfc.Revalidator) (fresh *http.Response, stale *http.Response) {
var resultFresh *http.Response
var resultStale *http.Response

_ = provider.DB.View(func(tx *badger.Txn) error {
i, e := tx.Get([]byte(mappingKeyPrefix + key))
if e != nil && !errors.Is(e, badger.ErrKeyNotFound) {
return e
}

var val []byte
if i != nil {
_ = i.Value(func(b []byte) error {
val = b

return nil
})
}
resultFresh, resultStale, e = mappingElection(provider, val, req, validator, provider.logger)

return e
})

return resultFresh, resultStale
}

// SetMultiLevel tries to store the keywith the given value and update the mapping key to store metadata.
func (provider *Badger) SetMultiLevel(baseKey, key string, value []byte, variedHeaders http.Header, etag string, duration time.Duration) error {
now := time.Now()

err := provider.DB.Update(func(tx *badger.Txn) error {
var e error
e = tx.SetEntry(badger.NewEntry([]byte(key), value).WithTTL(duration))
if e != nil {
provider.logger.Sugar().Errorf("Impossible to set the key %s into Badger, %v", key, e)
return e
}

mappingKey := mappingKeyPrefix + baseKey
item, e := tx.Get([]byte(mappingKey))
if e != nil && !errors.Is(e, badger.ErrKeyNotFound) {
provider.logger.Sugar().Errorf("Impossible to get the base key %s in Badger, %v", mappingKey, e)
return e
}

var val []byte
if item != nil {
_ = item.Value(func(b []byte) error {
val = b

return nil
})
}

val, e = mappingUpdater(key, val, provider.logger, now, now.Add(duration), now.Add(duration+provider.stale), variedHeaders, etag)
if e != nil {
return e
}

provider.logger.Sugar().Errorf("Store the new mapping for the key %s in Badger, %v", key, string(val))

return tx.Set([]byte(mappingKey), val)
})

if err != nil {
provider.logger.Sugar().Errorf("Impossible to set value into Badger, %v", err)
}

return err
}

// Set method will store the response in Badger provider
func (provider *Badger) Set(key string, value []byte, url t.URL, duration time.Duration) error {
if duration == 0 {
Expand Down
48 changes: 48 additions & 0 deletions pkg/storage/embeddedOlricProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"bytes"
"context"
"errors"
"net/http"
"os"
"strings"
Expand Down Expand Up @@ -185,6 +186,53 @@ func (provider *EmbeddedOlric) Prefix(key string, req *http.Request, validator *
return nil
}

// GetMultiLevel tries to load the key and check if one of linked keys is a fresh/stale candidate.
func (provider *EmbeddedOlric) GetMultiLevel(key string, req *http.Request, validator *rfc.Revalidator) (fresh *http.Response, stale *http.Response) {
var resultFresh *http.Response
var resultStale *http.Response

res, e := provider.dm.Get(provider.ct, key)

if e != nil {
return resultFresh, resultStale
}

val, _ := res.Byte()
resultFresh, resultStale, _ = mappingElection(provider, val, req, validator, provider.logger)

return resultFresh, resultStale
}

// SetMultiLevel tries to store the keywith the given value and update the mapping key to store metadata.
func (provider *EmbeddedOlric) SetMultiLevel(baseKey, key string, value []byte, variedHeaders http.Header, etag string, duration time.Duration) error {
now := time.Now()

if err := provider.dm.Put(provider.ct, key, value, olric.EX(duration)); err != nil {
provider.logger.Sugar().Errorf("Impossible to set value into EmbeddedOlric, %v", err)
return err
}

mappingKey := mappingKeyPrefix + baseKey
res, e := provider.dm.Get(provider.ct, mappingKey)
if e != nil && !errors.Is(e, olric.ErrKeyNotFound) {
provider.logger.Sugar().Errorf("Impossible to get the key %s EmbeddedOlric, %v", baseKey, e)
return nil
}

val, e := res.Byte()
if e != nil {
provider.logger.Sugar().Errorf("Impossible to parse the key %s value as byte, %v", baseKey, e)
return e
}

val, e = mappingUpdater(key, val, provider.logger, now, now.Add(duration), now.Add(duration+provider.stale), variedHeaders, etag)
if e != nil {
return e
}

return provider.Set(mappingKey, val, t.URL{}, time.Hour)
}

// Get method returns the populated response if exists, empty response then
func (provider *EmbeddedOlric) Get(key string) []byte {
res, err := provider.dm.Get(provider.ct, key)
Expand Down
49 changes: 49 additions & 0 deletions pkg/storage/etcdProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,55 @@ func (provider *Etcd) Prefix(key string, req *http.Request, validator *rfc.Reval
return nil
}

// GetMultiLevel tries to load the key and check if one of linked keys is a fresh/stale candidate.
func (provider *Etcd) GetMultiLevel(key string, req *http.Request, validator *rfc.Revalidator) (fresh *http.Response, stale *http.Response) {
if provider.reconnecting {
provider.logger.Sugar().Error("Impossible to get the etcd key while reconnecting.")
return
}

var resultFresh *http.Response
var resultStale *http.Response

r, e := provider.Client.Get(provider.ctx, key)
if e != nil {
go provider.Reconnect()
return resultFresh, resultStale
}

if len(r.Kvs) > 0 {
resultFresh, resultStale, _ = mappingElection(provider, r.Kvs[0].Value, req, validator, provider.logger)
}

return resultFresh, resultStale
}

// SetMultiLevel tries to store the keywith the given value and update the mapping key to store metadata.
func (provider *Etcd) SetMultiLevel(baseKey, key string, value []byte, variedHeaders http.Header, etag string, duration time.Duration) error {
if provider.reconnecting {
provider.logger.Sugar().Error("Impossible to set the etcd value while reconnecting.")
return fmt.Errorf("reconnecting error")
}

now := time.Now()
if err := provider.Set(key, value, t.URL{}, duration); err != nil {
if !provider.reconnecting {
go provider.Reconnect()
}
provider.logger.Sugar().Errorf("Impossible to set value into Redis, %v", err)
return err
}

mappingKey := mappingKeyPrefix + baseKey
r := provider.Get(mappingKey)
val, e := mappingUpdater(mappingKeyPrefix+baseKey, []byte(r), provider.logger, now, now.Add(duration), now.Add(duration+provider.stale), variedHeaders, etag)
if e != nil {
return e
}

return provider.Set(mappingKey, val, t.URL{}, time.Hour)
}

// Set method will store the response in Etcd provider
func (provider *Etcd) Set(key string, value []byte, url t.URL, duration time.Duration) error {
if provider.reconnecting {
Expand Down
64 changes: 64 additions & 0 deletions pkg/storage/nutsProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"bytes"
"encoding/json"
"errors"
"net/http"
"strconv"
"strings"
Expand Down Expand Up @@ -214,6 +215,69 @@ func (provider *Nuts) Prefix(key string, req *http.Request, validator *rfc.Reval
return result
}

// GetMultiLevel tries to load the key and check if one of linked keys is a fresh/stale candidate.
func (provider *Nuts) GetMultiLevel(key string, req *http.Request, validator *rfc.Revalidator) (fresh *http.Response, stale *http.Response) {
var resultFresh *http.Response
var resultStale *http.Response

_ = provider.DB.View(func(tx *nutsdb.Tx) error {
i, e := tx.Get(bucket, []byte(mappingKeyPrefix+key))
if e != nil && !errors.Is(e, nutsdb.ErrKeyNotFound) {
return e
}

var val []byte
if i != nil {
val = i.Value
}
resultFresh, resultStale, e = mappingElection(provider, val, req, validator, provider.logger)

return e
})

return resultFresh, resultStale
}

// SetMultiLevel tries to store the keywith the given value and update the mapping key to store metadata.
func (provider *Nuts) SetMultiLevel(baseKey, key string, value []byte, variedHeaders http.Header, etag string, duration time.Duration) error {
now := time.Now()

err := provider.DB.Update(func(tx *nutsdb.Tx) error {
var e error
e = tx.Put(bucket, []byte(key), value, uint32(duration.Seconds()))
if e != nil {
provider.logger.Sugar().Errorf("Impossible to set the key %s into Nuts, %v", key, e)
return e
}

mappingKey := mappingKeyPrefix + baseKey
item, e := tx.Get(bucket, []byte(mappingKey))
if e != nil && !errors.Is(e, nutsdb.ErrKeyNotFound) {
provider.logger.Sugar().Errorf("Impossible to get the base key %s in Nuts, %v", baseKey, e)
return e
}

var val []byte
if item != nil {
val = item.Value
}
val, e = mappingUpdater(key, val, provider.logger, now, now.Add(duration), now.Add(duration+provider.stale), variedHeaders, etag)
if e != nil {
return e
}

provider.logger.Sugar().Errorf("Store the new mapping for the key %s in Nuts, %v", key, string(val))

return tx.Put(bucket, []byte(mappingKey), val, uint32(time.Hour.Seconds()))
})

if err != nil {
provider.logger.Sugar().Errorf("Impossible to set value into Nuts, %v", err)
}

return err
}

// Set method will store the response in Nuts provider
func (provider *Nuts) Set(key string, value []byte, url t.URL, duration time.Duration) error {
if duration == 0 {
Expand Down
Loading

0 comments on commit 822f12b

Please sign in to comment.