-
Notifications
You must be signed in to change notification settings - Fork 0
/
simfaas.go
216 lines (186 loc) · 4.8 KB
/
simfaas.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
package simfaas
import (
`context`
`errors`
`go.uber.org/atomic`
`log`
`sync`
`time`
)
const (
functionGCInterval = time.Second
)
var (
ErrFunctionNotFound = errors.New("function does not exist")
)
type FunctionConfig struct {
ColdStart time.Duration
KeepWarm time.Duration
Runtime time.Duration
// instanceCapacity defines the number of parallel executions a function instance can handle
// Negative or zero indicates an infinite capacity; no more than 1 instance is used.
// instanceCapacity int
}
type ExecutionReport struct {
ColdStart time.Duration `json:"cold_start"`
Runtime time.Duration `json:"runtime"`
StartedAt time.Time `json:"started_at"`
FinishedAt time.Time `json:"finished_at"`
}
type Function struct {
*FunctionConfig
name string
deployedAt time.Time
lastExec time.Time
mu sync.RWMutex
// instances reports the number of instances of this function currently deployed.
instances atomic.Uint32
// active reports the number of executions of this function currently running.
active atomic.Uint32
// queued reports the number of executions of this function currently queued,
// waiting for instances to free up or additional to be deployed.
queued atomic.Uint32
}
type Platform struct {
functions *sync.Map // map[string]*Function
init *sync.Once
stopFn func()
activeInstances atomic.Uint32
activeExecutions atomic.Uint32
queuedExecutions atomic.Uint32
}
func New() *Platform {
return &Platform{
functions: &sync.Map{},
init: &sync.Once{},
}
}
func (p *Platform) Start() error {
p.init.Do(func() {
ctx, closeFn := context.WithCancel(context.Background())
go p.runFunctionGC(ctx.Done())
p.stopFn = closeFn
})
return nil
}
// Future: change to priority queue
func (p *Platform) runFunctionGC(closeC <-chan struct{}) {
ticker := time.NewTicker(functionGCInterval)
for {
select {
case <-closeC:
return
case <-ticker.C:
}
now := time.Now()
p.RangeFunctions(func(k string, fn *Function) bool {
if fn.instances.Load() > 0 &&
fn.lastExec.Add(fn.KeepWarm).Before(now) &&
fn.deployedAt.Add(fn.KeepWarm).Before(now) &&
fn.active.Load() == 0 {
p.cleanup(fn)
log.Printf("%s: cleaned up instance (1 -> 0)", k)
}
return true
})
}
}
func (p *Platform) RangeFunctions(rangeFn func(k string, fn *Function) bool) {
p.functions.Range(func(key, value interface{}) bool {
return rangeFn(key.(string), value.(*Function))
})
}
func (p *Platform) ActiveExecutions() uint32 {
return p.activeExecutions.Load()
}
func (p *Platform) QueuedExecutions() uint32 {
return p.queuedExecutions.Load()
}
func (p *Platform) ActiveFunctionInstances() uint32 {
return p.activeInstances.Load()
}
func (p *Platform) Close() error {
p.stopFn()
return nil
}
func (p *Platform) Define(fnName string, config *FunctionConfig) {
p.functions.Store(fnName, &Function{
name: fnName,
FunctionConfig: config,
})
}
func (p *Platform) cleanup(fn *Function) {
p.activeInstances.Dec()
fn.instances.Store(0)
}
// Run emulates a function execution in a synchronous way,
// sleeping for the entire executionRuntime.
//
// TODO emulate inputs and outputs
func (p *Platform) Run(fnName string, executionRuntime *time.Duration) (*ExecutionReport, error) {
startedAt := time.Now()
// Find the function
fn, ok := p.Get(fnName)
if !ok {
return nil, ErrFunctionNotFound
}
// Ensure that there is enough capacity
var coldStart time.Duration
if fn.instances.Load() == 0 {
p.queuedExecutions.Inc()
fn.queued.Inc()
coldStart = p.deploy(fn)
fn.queued.Dec()
p.queuedExecutions.Dec()
}
// Simulate function execution
fn.active.Inc()
p.activeExecutions.Inc()
runtime := fn.Runtime
if executionRuntime != nil {
runtime = *executionRuntime
}
time.Sleep(runtime)
fn.active.Dec()
p.activeExecutions.Dec()
finishedAt := time.Now()
// Update function stats
fn.mu.Lock()
fn.lastExec = time.Now()
fn.mu.Unlock()
return &ExecutionReport{
StartedAt: startedAt,
FinishedAt: finishedAt,
Runtime: finishedAt.Sub(startedAt),
ColdStart: coldStart,
}, nil
}
func (p *Platform) Get(fnName string) (*Function, bool) {
v, ok := p.functions.Load(fnName)
if !ok {
return nil, ok
}
return v.(*Function), ok
}
func (p *Platform) Deploy(fnName string) (coldStart time.Duration, err error) {
// Find the function
fn, ok := p.Get(fnName)
if !ok {
return 0, ErrFunctionNotFound
}
return p.deploy(fn), nil
}
func (p *Platform) deploy(fn *Function) (coldStart time.Duration) {
// Deploy if there is no instance available
startedAt := time.Now()
fn.mu.Lock()
if fn.instances.Load() == 0 {
time.Sleep(fn.ColdStart)
fn.instances.Store(1)
p.activeInstances.Inc()
fn.deployedAt = time.Now()
log.Printf("%s: deployed instance (0 -> 1)", fn.name)
}
fn.mu.Unlock()
return time.Now().Sub(startedAt)
}