Skip to content

Commit 7b30194

Browse files
committed
improve: introduce github.com/panjf2000/ants for routine pool
1 parent abd2689 commit 7b30194

File tree

8 files changed

+113
-188
lines changed

8 files changed

+113
-188
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ require (
88
github.com/fatih/color v1.16.0
99
github.com/opencontainers/go-digest v1.0.0
1010
github.com/opencontainers/image-spec v1.1.0-rc5
11+
github.com/panjf2000/ants v1.3.0
1112
github.com/sirupsen/logrus v1.9.3
1213
github.com/spf13/cobra v1.8.0
1314
github.com/stretchr/testify v1.8.4

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ github.com/opencontainers/runc v1.1.10 h1:EaL5WeO9lv9wmS6SASjszOeQdSctvpbu0DdBQB
8080
github.com/opencontainers/runc v1.1.10/go.mod h1:+/R6+KmDlh+hOO8NkjmgkG9Qzvypzk0yXxAPYYR65+M=
8181
github.com/opencontainers/runtime-spec v1.1.0 h1:HHUyrt9mwHUjtasSbXSMvs4cyFxh+Bll4AjJ9odEGpg=
8282
github.com/opencontainers/runtime-spec v1.1.0/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0=
83+
github.com/panjf2000/ants v1.3.0 h1:8pQ+8leaLc9lys2viEEr8md0U4RN6uOSUCE9bOYjQ9M=
84+
github.com/panjf2000/ants v1.3.0/go.mod h1:AaACblRPzq35m1g3enqYcxspbbiOJJYaxU2wMpm1cXY=
8385
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
8486
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
8587
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=

pkg/client/client.go

Lines changed: 62 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@ import (
66
"os"
77
"time"
88

9-
"github.com/AliyunContainerService/image-syncer/pkg/utils/types"
9+
"github.com/fatih/color"
10+
"github.com/panjf2000/ants"
11+
"github.com/sirupsen/logrus"
1012
"gopkg.in/yaml.v2"
1113

1214
"github.com/AliyunContainerService/image-syncer/pkg/concurrent"
1315
"github.com/AliyunContainerService/image-syncer/pkg/task"
14-
"github.com/fatih/color"
15-
"github.com/sirupsen/logrus"
16+
"github.com/AliyunContainerService/image-syncer/pkg/utils/types"
1617
)
1718

1819
// Client describes a synchronization client
@@ -97,7 +98,48 @@ func (c *Client) Run() error {
9798
}
9899
}
99100

100-
c.openRoutinesHandleTaskAndWaitForFinish()
101+
routinePool, _ := ants.NewPoolWithFunc(c.routineNum, func(i interface{}) {
102+
tTask, ok := i.(task.Task)
103+
if !ok {
104+
c.logger.Errorf("invalid task %v", i)
105+
return
106+
}
107+
108+
nextTasks, message, err := tTask.Run()
109+
count, total := c.taskCounter.Increase()
110+
finishedNumString := color.New(color.FgGreen).Sprintf("%d", count)
111+
totalNumString := color.New(color.FgGreen).Sprintf("%d", total)
112+
113+
if err != nil {
114+
c.failedTaskList.PushBack(tTask)
115+
c.failedTaskCounter.IncreaseTotal()
116+
c.logger.Errorf("Failed to executed %v: %v. Now %v/%v tasks have been processed.", tTask.String(), err,
117+
finishedNumString, totalNumString)
118+
} else {
119+
if tTask.Type() == task.ManifestType {
120+
// TODO: the ignored images will not be recorded in success images list
121+
c.successImagesList.Add(tTask.GetSource().String(), tTask.GetDestination().String())
122+
}
123+
124+
if len(message) != 0 {
125+
c.logger.Infof("Finish %v: %v. Now %v/%v tasks have been processed.", tTask.String(), message,
126+
finishedNumString, totalNumString)
127+
} else {
128+
c.logger.Infof("Finish %v. Now %v/%v tasks have been processed.", tTask.String(),
129+
finishedNumString, totalNumString)
130+
}
131+
}
132+
133+
for _, t := range nextTasks {
134+
c.taskList.PushFront(t)
135+
c.taskCounter.IncreaseTotal()
136+
}
137+
})
138+
defer routinePool.Release()
139+
140+
if err = c.handleTasks(routinePool); err != nil {
141+
return fmt.Errorf("failed to handle tasks: %v", err)
142+
}
101143

102144
for times := 0; times < c.retries; times++ {
103145
c.taskCounter, c.failedTaskCounter = c.failedTaskCounter, concurrent.NewCounter(0, 0)
@@ -110,7 +152,9 @@ func (c *Client) Run() error {
110152
if c.taskList.Len() != 0 {
111153
// retry to handle task
112154
c.logger.Infof("Start to retry tasks, please wait ...")
113-
c.openRoutinesHandleTaskAndWaitForFinish()
155+
if err = c.handleTasks(routinePool); err != nil {
156+
return fmt.Errorf("failed to handle tasks: %v", err)
157+
}
114158
}
115159
}
116160

@@ -144,73 +188,21 @@ func (c *Client) Run() error {
144188
return nil
145189
}
146190

147-
func (c *Client) openRoutinesHandleTaskAndWaitForFinish() {
148-
broadcastChan := concurrent.NewBroadcastChan(c.routineNum)
149-
broadcastChan.Broadcast()
150-
151-
go func() {
152-
for {
153-
// if all the worker routines is hung and taskList is empty, stop everything
154-
<-broadcastChan.TotalHungChan()
155-
if c.taskList.Len() == 0 {
156-
broadcastChan.Close()
191+
func (c *Client) handleTasks(routinePool *ants.PoolWithFunc) error {
192+
for {
193+
item := c.taskList.PopFront()
194+
// no more tasks need to handle
195+
if item == nil {
196+
if routinePool.Running() == 0 {
197+
break
157198
}
199+
time.Sleep(1 * time.Second)
200+
continue
158201
}
159-
}()
160-
161-
concurrent.CreateRoutinesAndWaitForFinish(c.routineNum, func() {
162-
for {
163-
closed := broadcastChan.Wait()
164-
165-
// run out of exist tasks
166-
for {
167-
item := c.taskList.PopFront()
168-
// no more tasks need to handle
169-
if item == nil {
170-
break
171-
}
172-
173-
tTask := item.(task.Task)
174-
175-
c.logger.Infof("Executing %v...", tTask.String())
176-
nextTasks, message, err := tTask.Run()
177-
178-
count, total := c.taskCounter.Increase()
179-
finishedNumString := color.New(color.FgGreen).Sprintf("%d", count)
180-
totalNumString := color.New(color.FgGreen).Sprintf("%d", total)
181-
182-
if err != nil {
183-
c.failedTaskList.PushBack(tTask)
184-
c.failedTaskCounter.IncreaseTotal()
185-
c.logger.Errorf("Failed to executed %v: %v. Now %v/%v tasks have been processed.", tTask.String(), err,
186-
finishedNumString, totalNumString)
187-
} else {
188-
if tTask.Type() == task.ManifestType {
189-
// TODO: the ignored images will not be recorded in success images list
190-
c.successImagesList.Add(tTask.GetSource().String(), tTask.GetDestination().String())
191-
}
192-
193-
if len(message) != 0 {
194-
c.logger.Infof("Finish %v: %v. Now %v/%v tasks have been processed.", tTask.String(), message,
195-
finishedNumString, totalNumString)
196-
} else {
197-
c.logger.Infof("Finish %v. Now %v/%v tasks have been processed.", tTask.String(),
198-
finishedNumString, totalNumString)
199-
}
200-
}
201-
202-
if nextTasks != nil {
203-
for _, t := range nextTasks {
204-
c.taskList.PushFront(t)
205-
c.taskCounter.IncreaseTotal()
206-
}
207-
broadcastChan.Broadcast()
208-
}
209-
}
210202

211-
if closed {
212-
return
213-
}
203+
if err := routinePool.Invoke(item); err != nil {
204+
return fmt.Errorf("failed to invoke routine: %v", err)
214205
}
215-
})
206+
}
207+
return nil
216208
}

pkg/concurrent/broadcastChan.go

Lines changed: 0 additions & 54 deletions
This file was deleted.

pkg/concurrent/counter.go

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,23 @@
11
package concurrent
22

3+
import "sync"
4+
35
type Counter struct {
4-
c chan struct{}
6+
sync.Mutex
57
count int
68
total int
79
}
810

911
func NewCounter(count, total int) *Counter {
1012
return &Counter{
11-
c: make(chan struct{}, 1),
1213
count: count,
1314
total: total,
1415
}
1516
}
1617

1718
func (c *Counter) Decrease() (int, int) {
18-
c.c <- struct{}{}
19-
defer func() {
20-
<-c.c
21-
}()
19+
c.Lock()
20+
defer c.Unlock()
2221

2322
if c.count > 0 {
2423
c.count--
@@ -27,10 +26,8 @@ func (c *Counter) Decrease() (int, int) {
2726
}
2827

2928
func (c *Counter) Increase() (int, int) {
30-
c.c <- struct{}{}
31-
defer func() {
32-
<-c.c
33-
}()
29+
c.Lock()
30+
defer c.Unlock()
3431

3532
if c.count < c.total {
3633
c.count++
@@ -39,21 +36,17 @@ func (c *Counter) Increase() (int, int) {
3936
}
4037

4138
func (c *Counter) IncreaseTotal() (int, int) {
42-
c.c <- struct{}{}
43-
defer func() {
44-
<-c.c
45-
}()
39+
c.Lock()
40+
defer c.Unlock()
4641

4742
c.total++
4843
return c.count, c.total
4944
}
5045

5146
// Value return count and total
5247
func (c *Counter) Value() (int, int) {
53-
c.c <- struct{}{}
54-
defer func() {
55-
<-c.c
56-
}()
48+
c.Lock()
49+
defer c.Unlock()
5750

5851
return c.count, c.total
5952
}

pkg/concurrent/imageList.go

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,62 +1,53 @@
11
package concurrent
22

33
import (
4+
"sync"
5+
46
"github.com/AliyunContainerService/image-syncer/pkg/utils/types"
57
)
68

79
type ImageList struct {
8-
c chan struct{}
10+
sync.Mutex
911
content types.ImageList
1012
}
1113

1214
func NewImageList() *ImageList {
1315
return &ImageList{
14-
c: make(chan struct{}, 1),
1516
content: types.ImageList{},
1617
}
1718
}
1819

1920
func (i *ImageList) Add(src, dst string) {
20-
i.c <- struct{}{}
21-
defer func() {
22-
<-i.c
23-
}()
21+
i.Lock()
22+
defer i.Unlock()
2423

2524
i.content.Add(src, dst)
2625
}
2726

2827
func (i *ImageList) Query(src, dst string) bool {
29-
i.c <- struct{}{}
30-
defer func() {
31-
<-i.c
32-
}()
28+
i.Lock()
29+
defer i.Unlock()
3330

3431
return i.content.Query(src, dst)
3532
}
3633

3734
func (i *ImageList) Delete(key string) {
38-
i.c <- struct{}{}
39-
defer func() {
40-
<-i.c
41-
}()
35+
i.Lock()
36+
defer i.Unlock()
4237

4338
delete(i.content, key)
4439
}
4540

4641
func (i *ImageList) Rest() {
47-
i.c <- struct{}{}
48-
defer func() {
49-
<-i.c
50-
}()
42+
i.Lock()
43+
defer i.Unlock()
5144

5245
i.content = types.ImageList{}
5346
}
5447

5548
func (i *ImageList) Content() types.ImageList {
56-
i.c <- struct{}{}
57-
defer func() {
58-
<-i.c
59-
}()
49+
i.Lock()
50+
defer i.Unlock()
6051

6152
return i.content
6253
}

0 commit comments

Comments
 (0)