Skip to content

Commit ee3009b

Browse files
fix(joiner): resolve race condition
1 parent 78bca3c commit ee3009b

File tree

1 file changed

+30
-11
lines changed

1 file changed

+30
-11
lines changed

pkg/file/joiner/joiner.go

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,14 @@ func (g *decoderCache) createRemoveCallback(key string) func(error) {
8181
}
8282
}
8383

84+
// getFromCache safely retrieves a decoder from cache with lock
85+
func (g *decoderCache) getFromCache(key string) (storage.Getter, bool) {
86+
g.mu.Lock()
87+
defer g.mu.Unlock()
88+
d, ok := g.cache[key]
89+
return d, ok
90+
}
91+
8492
// GetOrCreate returns a decoder for the given chunk address
8593
func (g *decoderCache) GetOrCreate(addrs []swarm.Address, shardCnt int) storage.Getter {
8694
// since a recovery decoder is not allowed, simply return the underlying netstore
@@ -93,9 +101,8 @@ func (g *decoderCache) GetOrCreate(addrs []swarm.Address, shardCnt int) storage.
93101
}
94102

95103
key := fingerprint(addrs)
96-
g.mu.Lock()
97-
defer g.mu.Unlock()
98-
d, ok := g.cache[key]
104+
d, ok := g.getFromCache(key)
105+
99106
if ok {
100107
if d == nil {
101108
// The nil value indicates a previous successful recovery
@@ -105,15 +112,20 @@ func (g *decoderCache) GetOrCreate(addrs []swarm.Address, shardCnt int) storage.
105112
// Create a factory function that will instantiate the decoder only when needed
106113
recovery := func() storage.Getter {
107114
g.config.Logger.Debug("lazy-creating recovery decoder after fetch failed", "key", key)
115+
116+
if d, ok := g.getFromCache(key); ok && d != nil {
117+
return d
118+
}
119+
120+
newGetter := getter.New(addrs, shardCnt, g.fetcher, g.putter, decoderCallback, g.config)
121+
108122
g.mu.Lock()
109123
defer g.mu.Unlock()
110-
d, ok := g.cache[key]
111-
if ok && d != nil {
124+
if d, ok := g.cache[key]; ok && d != nil {
112125
return d
113126
}
114-
d = getter.New(addrs, shardCnt, g.fetcher, g.putter, decoderCallback, g.config)
115-
g.cache[key] = d
116-
return d
127+
g.cache[key] = newGetter
128+
return newGetter
117129
}
118130

119131
return getter.NewReDecoder(g.fetcher, recovery, g.config.Logger)
@@ -122,9 +134,16 @@ func (g *decoderCache) GetOrCreate(addrs []swarm.Address, shardCnt int) storage.
122134
}
123135

124136
removeCallback := g.createRemoveCallback(key)
125-
d = getter.New(addrs, shardCnt, g.fetcher, g.putter, removeCallback, g.config)
126-
g.cache[key] = d
127-
return d
137+
newGetter := getter.New(addrs, shardCnt, g.fetcher, g.putter, removeCallback, g.config)
138+
139+
// ensure no other goroutine created the same getter
140+
g.mu.Lock()
141+
defer g.mu.Unlock()
142+
if d, ok := g.cache[key]; ok {
143+
return d
144+
}
145+
g.cache[key] = newGetter
146+
return newGetter
128147
}
129148

130149
// New creates a new Joiner. A Joiner provides Read, Seek and Size functionalities.

0 commit comments

Comments
 (0)