Skip to content

Commit

Permalink
feat(core): upstream select it's own storage (#582)
Browse files Browse the repository at this point in the history
* feat(core): upstream select it's own storage

* fix(ci): vendor and use sync.Mutex instead of reference
  • Loading branch information
darkweak authored Dec 22, 2024
1 parent 0e1579f commit 7649435
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 34 deletions.
63 changes: 44 additions & 19 deletions pkg/middleware/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,26 +358,51 @@ func (s *SouinBaseHandler) Store(
hn := strings.Split(hname, ":")
vhs.Set(hn[0], rq.Header.Get(hn[0]))
}
for _, storer := range s.Storers {
wg.Add(1)
go func(currentStorer types.Storer, currentRes http.Response) {
defer wg.Done()
if currentStorer.SetMultiLevel(
cachedKey,
variedKey,
response,
vhs,
currentRes.Header.Get("Etag"), ma,
variedKey,
) == nil {
s.Configuration.GetLogger().Debugf("Stored the key %s in the %s provider", variedKey, currentStorer.Name())
currentRes.Request = rq
} else {
mu.Lock()
fails = append(fails, fmt.Sprintf("; detail=%s-INSERTION-ERROR", currentStorer.Name()))
mu.Unlock()
if upstreamStorerTarget := res.Header.Get("X-Souin-Storer"); upstreamStorerTarget != "" {
res.Header.Del("X-Souin-Storer")

var overridedStorer types.Storer
for _, storer := range s.Storers {
if strings.Contains(strings.ToLower(storer.Name()), strings.ToLower(upstreamStorerTarget)) {
overridedStorer = storer
}
}(storer, res)
}

if overridedStorer.SetMultiLevel(
cachedKey,
variedKey,
response,
vhs,
res.Header.Get("Etag"), ma,
variedKey,
) == nil {
s.Configuration.GetLogger().Debugf("Stored the key %s in the %s provider", variedKey, overridedStorer.Name())
res.Request = rq
} else {
fails = append(fails, fmt.Sprintf("; detail=%s-INSERTION-ERROR", overridedStorer.Name()))
}
} else {
for _, storer := range s.Storers {
wg.Add(1)
go func(currentStorer types.Storer, currentRes http.Response) {
defer wg.Done()
if currentStorer.SetMultiLevel(
cachedKey,
variedKey,
response,
vhs,
currentRes.Header.Get("Etag"), ma,
variedKey,
) == nil {
s.Configuration.GetLogger().Debugf("Stored the key %s in the %s provider", variedKey, currentStorer.Name())
currentRes.Request = rq
} else {
mu.Lock()
fails = append(fails, fmt.Sprintf("; detail=%s-INSERTION-ERROR", currentStorer.Name()))
mu.Unlock()
}
}(storer, res)
}
}

wg.Wait()
Expand Down
4 changes: 2 additions & 2 deletions pkg/middleware/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func NewCustomWriter(rq *http.Request, rw http.ResponseWriter, b *bytes.Buffer)
Req: rq,
Rw: rw,
Headers: http.Header{},
mutex: &sync.Mutex{},
mutex: sync.Mutex{},
}
}

Expand All @@ -35,7 +35,7 @@ type CustomWriter struct {
Req *http.Request
Headers http.Header
headersSent bool
mutex *sync.Mutex
mutex sync.Mutex
statusCode int
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/surrogate/providers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ type baseStorage struct {
dynamic bool
keepStale bool
logger core.Logger
mu *sync.Mutex
mu sync.Mutex
duration time.Duration
}

Expand Down Expand Up @@ -159,7 +159,6 @@ func (s *baseStorage) init(config configurationtypes.AbstractConfigurationInterf
s.dynamic = config.GetDefaultCache().GetCDN().Dynamic
s.logger = config.GetLogger()
s.keysRegexp = keysRegexp
s.mu = &sync.Mutex{}
s.duration = storageToInfiniteTTLMap[s.Storage.Name()]
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/surrogate/providers/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func mockCommonProvider() *baseStorage {
Keys: make(map[string]configurationtypes.SurrogateKeys),
keysRegexp: make(map[string]keysRegexpInner),
dynamic: true,
mu: &sync.Mutex{},
mu: sync.Mutex{},
logger: zap.NewNop().Sugar(),
},
}
Expand Down
4 changes: 2 additions & 2 deletions plugins/traefik/override/middleware/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func NewCustomWriter(rq *http.Request, rw http.ResponseWriter, b *bytes.Buffer)
Req: rq,
Rw: rw,
Headers: http.Header{},
mutex: &sync.Mutex{},
mutex: sync.Mutex{},
}
}

Expand All @@ -34,7 +34,7 @@ type CustomWriter struct {
Req *http.Request
Headers http.Header
headersSent bool
mutex *sync.Mutex
mutex sync.Mutex
statusCode int
}

Expand Down
4 changes: 2 additions & 2 deletions plugins/traefik/override/surrogate/providers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type baseStorage struct {
keysRegexp map[string]keysRegexpInner
dynamic bool
keepStale bool
mu *sync.Mutex
mu sync.Mutex
duration time.Duration
}

Expand Down Expand Up @@ -116,7 +116,7 @@ func (s *baseStorage) init(config configurationtypes.AbstractConfigurationInterf

s.dynamic = config.GetDefaultCache().GetCDN().Dynamic
s.keysRegexp = keysRegexp
s.mu = &sync.Mutex{}
s.mu = sync.Mutex{}
s.duration = storageToInfiniteTTLMap[s.Storage.Name()]
}

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions plugins/tyk/override/cache/surrogate/providers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type baseStorage struct {
dynamic bool
keepStale bool
logger *zap.Logger
mu *sync.Mutex
mu sync.Mutex
}

func (s *baseStorage) init(config configurationtypes.AbstractConfigurationInterface) {
Expand Down Expand Up @@ -103,7 +103,7 @@ func (s *baseStorage) init(config configurationtypes.AbstractConfigurationInterf
s.dynamic = config.GetDefaultCache().GetCDN().Dynamic
s.logger = config.GetLogger()
s.keysRegexp = keysRegexp
s.mu = &sync.Mutex{}
s.mu = sync.Mutex{}
}

func (s *baseStorage) storeTag(tag string, cacheKey string, re *regexp.Regexp) {
Expand Down

0 comments on commit 7649435

Please sign in to comment.