Skip to content

Commit b99d9ff

Browse files
committed
add test to detect and fix race conditions
1 parent 01192ca commit b99d9ff

File tree

4 files changed

+207
-8
lines changed

4 files changed

+207
-8
lines changed

pkg/mirror/repo_pool.go

+12-5
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,15 @@ func NewRepoPool(ctx context.Context, conf RepoPoolConfig, log *slog.Logger, com
6060
// signal repository
6161
repoCancel()
6262

63+
rp.lock.RLock()
64+
defer rp.lock.RUnlock()
65+
6366
for {
6467
time.Sleep(time.Second)
6568
// check if any repo mirror is still running
6669
var running bool
6770
for _, repo := range rp.repos {
68-
if repo.running {
71+
if repo.IsRunning() {
6972
running = true
7073
break
7174
}
@@ -143,7 +146,7 @@ func (rp *RepoPool) StartLoop() {
143146
defer rp.lock.RUnlock()
144147

145148
for _, repo := range rp.repos {
146-
if !repo.running {
149+
if !repo.IsRunning() {
147150
go repo.StartLoop(rp.ctx)
148151
continue
149152
}
@@ -192,22 +195,26 @@ func (rp *RepoPool) RepositoriesDirPath() []string {
192195

193196
// AddWorktreeLink is wrapper around repositories AddWorktreeLink method
194197
func (rp *RepoPool) AddWorktreeLink(remote string, wt WorktreeConfig) error {
195-
rp.lock.RLock()
196-
defer rp.lock.RUnlock()
197-
198198
repo, err := rp.Repository(remote)
199199
if err != nil {
200200
return err
201201
}
202202
if err := rp.validateLinkPath(repo, wt.Link); err != nil {
203203
return err
204204
}
205+
206+
rp.lock.Lock()
207+
defer rp.lock.Unlock()
208+
205209
return repo.AddWorktreeLink(wt)
206210
}
207211

208212
func (rp *RepoPool) validateLinkPath(repo *Repository, link string) error {
209213
newAbsLink := absLink(repo.root, link)
210214

215+
rp.lock.RLock()
216+
defer rp.lock.RUnlock()
217+
211218
for _, r := range rp.repos {
212219
for _, wl := range r.WorktreeLinks() {
213220
if wl.linkAbs == newAbsLink {

pkg/mirror/repository.go

+19-1
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,9 @@ func NewRepository(repoConf RepositoryConfig, envs []string, log *slog.Logger) (
133133

134134
// AddWorktreeLink adds workTree link to the mirror repository.
135135
func (r *Repository) AddWorktreeLink(wtc WorktreeConfig) error {
136+
r.lock.Lock()
137+
defer r.lock.Unlock()
138+
136139
if wtc.Link == "" {
137140
return fmt.Errorf("symlink path cannot be empty")
138141
}
@@ -391,17 +394,32 @@ func (r *Repository) cloneByRef(ctx context.Context, dst, ref, pathspec string,
391394
return hash, nil
392395
}
393396

397+
// returns if repositories mirror loop is running or not
398+
func (r *Repository) IsRunning() bool {
399+
r.lock.RLock()
400+
defer r.lock.RUnlock()
401+
402+
return r.running
403+
}
404+
394405
// StartLoop mirrors repository periodically based on repo's mirror interval
395406
func (r *Repository) StartLoop(ctx context.Context) {
396-
if r.running {
407+
if r.IsRunning() {
397408
r.log.Error("mirror loop has already been started")
398409
return
399410
}
411+
412+
r.lock.Lock()
400413
r.running = true
414+
r.lock.Unlock()
415+
401416
r.log.Info("started repository mirror loop", "interval", r.interval)
402417

403418
defer func() {
419+
r.lock.Lock()
404420
r.running = false
421+
r.lock.Unlock()
422+
405423
close(r.stopped)
406424
}()
407425

pkg/mirror/z_e2e_race_test.go

+174
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"path/filepath"
99
"sync"
1010
"testing"
11+
"time"
1112
)
1213

1314
func Test_mirror_detect_race(t *testing.T) {
@@ -106,3 +107,176 @@ func Test_mirror_detect_race(t *testing.T) {
106107
})
107108

108109
}
110+
111+
func Test_mirror_detect_race_repo_pool(t *testing.T) {
112+
testTmpDir := mustTmpDir(t)
113+
defer os.RemoveAll(testTmpDir)
114+
115+
tempClone := mustTmpDir(t)
116+
defer os.RemoveAll(tempClone)
117+
118+
upstream1 := filepath.Join(testTmpDir, testUpstreamRepo)
119+
remote1 := "file://" + upstream1
120+
upstream2 := filepath.Join(testTmpDir, "upstream2")
121+
remote2 := "file://" + upstream2
122+
root := filepath.Join(testTmpDir, testRoot)
123+
124+
fileU1SHA1 := mustInitRepo(t, upstream1, "file", t.Name()+"-u1-main-1")
125+
fileU2SHA1 := mustInitRepo(t, upstream2, "file", t.Name()+"-u2-main-1")
126+
127+
rpc := RepoPoolConfig{
128+
Defaults: DefaultConfig{
129+
Root: root, Interval: testInterval, MirrorTimeout: testTimeout, GitGC: "always",
130+
},
131+
}
132+
133+
rp, err := NewRepoPool(t.Context(), rpc, testLog, testENVs)
134+
if err != nil {
135+
t.Fatalf("unexpected error: %v", err)
136+
}
137+
138+
t.Run("add-remove-repo-test", func(t *testing.T) {
139+
wg := &sync.WaitGroup{}
140+
wg.Add(1)
141+
142+
// add/remove 2 repositories
143+
go func() {
144+
defer wg.Done()
145+
for i := 0; i < 10; i++ {
146+
t.Log("adding remote1", "count", i)
147+
readStopped := make(chan bool)
148+
ctx, cancel := context.WithCancel(t.Context())
149+
150+
newConfig := RepoPoolConfig{
151+
Defaults: DefaultConfig{
152+
Root: root, Interval: testInterval, MirrorTimeout: testTimeout, GitGC: "always",
153+
},
154+
Repositories: []RepositoryConfig{{
155+
Remote: remote1,
156+
Worktrees: []WorktreeConfig{{Link: "link1"}}},
157+
},
158+
}
159+
if err := newConfig.ValidateAndApplyDefaults(); err != nil {
160+
t.Error("failed to validate new config", "err", err)
161+
return
162+
}
163+
164+
if err := rp.AddRepository(newConfig.Repositories[0]); err != nil {
165+
t.Error("unexpected err", "err", err)
166+
return
167+
}
168+
169+
rp.StartLoop()
170+
171+
go func() {
172+
for {
173+
time.Sleep(1 * time.Second)
174+
select {
175+
case <-ctx.Done():
176+
close(readStopped)
177+
return
178+
default:
179+
if got, err := rp.Hash(txtCtx, remote1, "HEAD", ""); err != nil {
180+
t.Error("unexpected err", "count", i, "err", err)
181+
} else if got != fileU1SHA1 {
182+
t.Errorf("remote1 hash mismatch got:%s want:%s", got, fileU1SHA1)
183+
}
184+
}
185+
186+
}
187+
}()
188+
189+
if err := rp.AddWorktreeLink(remote1, WorktreeConfig{"link2", "", []string{}}); err != nil {
190+
t.Error("unexpected err", "err", err)
191+
return
192+
}
193+
194+
time.Sleep(2 * time.Second)
195+
196+
if err := rp.RemoveWorktreeLink(remote1, "link1"); err != nil {
197+
t.Error("unexpected err", "err", err)
198+
return
199+
}
200+
201+
cancel()
202+
<-readStopped
203+
204+
if err := rp.RemoveRepository(remote1); err != nil {
205+
t.Error("unexpected err", "err", err)
206+
return
207+
}
208+
}
209+
210+
}()
211+
212+
go func() {
213+
defer wg.Done()
214+
for i := 0; i < 10; i++ {
215+
t.Log("adding remote2", "count", i)
216+
readStopped := make(chan bool)
217+
ctx, cancel := context.WithCancel(t.Context())
218+
219+
newConfig := RepoPoolConfig{
220+
Defaults: DefaultConfig{
221+
Root: root, Interval: testInterval, MirrorTimeout: testTimeout, GitGC: "always",
222+
},
223+
Repositories: []RepositoryConfig{{Remote: remote2,
224+
Worktrees: []WorktreeConfig{{Link: "link3"}}},
225+
},
226+
}
227+
if err := newConfig.ValidateAndApplyDefaults(); err != nil {
228+
t.Error("failed to validate new config", "err", err)
229+
return
230+
}
231+
232+
if err := rp.AddRepository(newConfig.Repositories[0]); err != nil {
233+
t.Error("unexpected err", "err", err)
234+
return
235+
}
236+
237+
rp.StartLoop()
238+
239+
// start loop to trigger read on repo pool
240+
go func() {
241+
for {
242+
time.Sleep(1 * time.Second)
243+
select {
244+
case <-ctx.Done():
245+
close(readStopped)
246+
return
247+
default:
248+
if got, err := rp.Hash(txtCtx, remote2, "HEAD", ""); err != nil {
249+
t.Error("unexpected err", "count", i, "err", err)
250+
} else if got != fileU2SHA1 {
251+
t.Errorf("remote2 hash mismatch got:%s want:%s", got, fileU2SHA1)
252+
}
253+
}
254+
}
255+
}()
256+
257+
if err := rp.AddWorktreeLink(remote2, WorktreeConfig{"link4", "", []string{}}); err != nil {
258+
t.Error("unexpected err", "err", err)
259+
return
260+
}
261+
262+
time.Sleep(2 * time.Second)
263+
264+
if err := rp.RemoveWorktreeLink(remote2, "link3"); err != nil {
265+
t.Error("unexpected err", "err", err)
266+
return
267+
}
268+
269+
cancel()
270+
271+
<-readStopped
272+
273+
if err := rp.RemoveRepository(remote2); err != nil {
274+
t.Error("unexpected err", "err", err)
275+
return
276+
}
277+
}
278+
}()
279+
280+
wg.Wait()
281+
})
282+
}

pkg/mirror/z_e2e_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -1293,7 +1293,7 @@ func Test_mirror_loop(t *testing.T) {
12931293
go repo.StartLoop(txtCtx)
12941294

12951295
time.Sleep(testInterval)
1296-
if repo.running != true {
1296+
if repo.IsRunning() != true {
12971297
t.Errorf("repo running state is still false after starting mirror loop")
12981298
}
12991299

@@ -1326,7 +1326,7 @@ func Test_mirror_loop(t *testing.T) {
13261326

13271327
// STOP mirror loop
13281328
repo.StopLoop()
1329-
if repo.running {
1329+
if repo.IsRunning() {
13301330
t.Errorf("repo still running after calling StopLoop")
13311331
}
13321332
}

0 commit comments

Comments
 (0)