Skip to content

Commit 4efbdd8

Browse files
canary iteration; public-proxy; private-proxy (#771)
1 parent 33fe17c commit 4efbdd8

4 files changed

+368
-21
lines changed

canary/privateHttpLooper.go

+236
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
package canary
2+
3+
import (
4+
"bytes"
5+
"context"
6+
cryptorand "crypto/rand"
7+
"encoding/base64"
8+
"github.com/openziti/sdk-golang/ziti"
9+
"github.com/openziti/sdk-golang/ziti/edge"
10+
"github.com/openziti/zrok/environment/env_core"
11+
"github.com/openziti/zrok/sdk/golang/sdk"
12+
"github.com/pkg/errors"
13+
"github.com/sirupsen/logrus"
14+
"io"
15+
"math/rand"
16+
"net"
17+
"net/http"
18+
"time"
19+
)
20+
21+
type PrivateHttpLooper struct {
22+
id uint
23+
acc *sdk.Access
24+
opt *LooperOptions
25+
root env_core.Root
26+
shr *sdk.Share
27+
listener edge.Listener
28+
abort bool
29+
done chan struct{}
30+
results *LooperResults
31+
}
32+
33+
func NewPrivateHttpLooper(id uint, opt *LooperOptions, root env_core.Root) *PrivateHttpLooper {
34+
return &PrivateHttpLooper{
35+
id: id,
36+
opt: opt,
37+
root: root,
38+
done: make(chan struct{}),
39+
results: &LooperResults{},
40+
}
41+
}
42+
43+
func (l *PrivateHttpLooper) Run() {
44+
defer close(l.done)
45+
defer logrus.Infof("#%d stopping", l.id)
46+
defer l.shutdown()
47+
logrus.Infof("#%d starting", l.id)
48+
49+
if err := l.startup(); err != nil {
50+
logrus.Fatalf("#%d error starting: %v", l.id, err)
51+
}
52+
53+
if err := l.bind(); err != nil {
54+
logrus.Fatalf("#%d error binding: %v", l.id, err)
55+
}
56+
57+
l.dwell()
58+
59+
l.iterate()
60+
61+
logrus.Infof("#%d completed", l.id)
62+
}
63+
64+
func (l *PrivateHttpLooper) Abort() {
65+
l.abort = true
66+
}
67+
68+
func (l *PrivateHttpLooper) Done() <-chan struct{} {
69+
return l.done
70+
}
71+
72+
func (l *PrivateHttpLooper) Results() *LooperResults {
73+
return l.results
74+
}
75+
76+
func (l *PrivateHttpLooper) startup() error {
77+
shr, err := sdk.CreateShare(l.root, &sdk.ShareRequest{
78+
ShareMode: sdk.PrivateShareMode,
79+
BackendMode: sdk.ProxyBackendMode,
80+
Target: "canary.PrivateHttpLooper",
81+
PermissionMode: sdk.ClosedPermissionMode,
82+
})
83+
if err != nil {
84+
return err
85+
}
86+
l.shr = shr
87+
88+
acc, err := sdk.CreateAccess(l.root, &sdk.AccessRequest{
89+
ShareToken: shr.Token,
90+
})
91+
if err != nil {
92+
return err
93+
}
94+
l.acc = acc
95+
96+
logrus.Infof("#%d allocated share '%v', allocated frontend '%v'", l.id, shr.Token, acc.Token)
97+
98+
return nil
99+
}
100+
101+
func (l *PrivateHttpLooper) bind() error {
102+
zif, err := l.root.ZitiIdentityNamed(l.root.EnvironmentIdentityName())
103+
if err != nil {
104+
return errors.Wrapf(err, "#%d error getting identity", l.id)
105+
}
106+
zcfg, err := ziti.NewConfigFromFile(zif)
107+
if err != nil {
108+
return errors.Wrapf(err, "#%d error loading ziti config", l.id)
109+
}
110+
options := ziti.ListenOptions{
111+
ConnectTimeout: 5 * time.Minute,
112+
WaitForNEstablishedListeners: 1,
113+
}
114+
zctx, err := ziti.NewContext(zcfg)
115+
if err != nil {
116+
return errors.Wrapf(err, "#%d error creating ziti context", l.id)
117+
}
118+
119+
if l.listener, err = zctx.ListenWithOptions(l.shr.Token, &options); err != nil {
120+
return errors.Wrapf(err, "#%d error binding listener", l.id)
121+
}
122+
123+
go func() {
124+
if err := http.Serve(l.listener, l); err != nil {
125+
logrus.Errorf("#%d error in http listener: %v", l.id, err)
126+
}
127+
}()
128+
129+
return nil
130+
}
131+
132+
func (l *PrivateHttpLooper) ServeHTTP(w http.ResponseWriter, r *http.Request) {
133+
buf := new(bytes.Buffer)
134+
io.Copy(buf, r.Body)
135+
w.Write(buf.Bytes())
136+
}
137+
138+
func (l *PrivateHttpLooper) dwell() {
139+
dwell := l.opt.MinDwell.Milliseconds()
140+
dwelta := l.opt.MaxDwell.Milliseconds() - l.opt.MinDwell.Milliseconds()
141+
if dwelta > 0 {
142+
dwell = int64(rand.Intn(int(dwelta)) + int(l.opt.MinDwell.Milliseconds()))
143+
}
144+
time.Sleep(time.Duration(dwell) * time.Millisecond)
145+
}
146+
147+
type connDialer struct {
148+
c net.Conn
149+
}
150+
151+
func (cd connDialer) Dial(_ context.Context, network, addr string) (net.Conn, error) {
152+
return cd.c, nil
153+
}
154+
155+
func (l *PrivateHttpLooper) iterate() {
156+
l.results.StartTime = time.Now()
157+
defer func() { l.results.StopTime = time.Now() }()
158+
159+
for i := uint(0); i < l.opt.Iterations; i++ {
160+
if i > 0 && i%l.opt.StatusInterval == 0 {
161+
logrus.Infof("#%d: iteration %d", l.id, i)
162+
}
163+
164+
conn, err := sdk.NewDialer(l.shr.Token, l.root)
165+
if err != nil {
166+
logrus.Errorf("#%d: error dialing: %v", l.id, err)
167+
l.results.Errors++
168+
time.Sleep(1 * time.Second)
169+
continue
170+
}
171+
172+
payloadSize := l.opt.MaxPayload
173+
payloadRange := l.opt.MaxPayload - l.opt.MinPayload
174+
if payloadRange > 0 {
175+
payloadSize = (rand.Uint64() % payloadRange) + l.opt.MinPayload
176+
}
177+
outPayload := make([]byte, payloadSize)
178+
cryptorand.Read(outPayload)
179+
outBase64 := base64.StdEncoding.EncodeToString(outPayload)
180+
181+
if req, err := http.NewRequest("POST", "http://"+l.shr.Token, bytes.NewBufferString(outBase64)); err == nil {
182+
client := &http.Client{Timeout: l.opt.Timeout, Transport: &http.Transport{DialContext: connDialer{conn}.Dial}}
183+
if resp, err := client.Do(req); err == nil {
184+
if resp.StatusCode != 200 {
185+
logrus.Errorf("#%d: unexpected status code: %v", l.id, resp.StatusCode)
186+
l.results.Errors++
187+
}
188+
inPayload := new(bytes.Buffer)
189+
io.Copy(inPayload, resp.Body)
190+
inBase64 := inPayload.String()
191+
if inBase64 != outBase64 {
192+
logrus.Errorf("#%d: payload mismatch", l.id)
193+
l.results.Mismatches++
194+
} else {
195+
l.results.Bytes += uint64(len(outBase64))
196+
logrus.Debugf("#%d: payload match", l.id)
197+
}
198+
} else {
199+
logrus.Errorf("#%d: error: %v", l.id, err)
200+
l.results.Errors++
201+
}
202+
} else {
203+
logrus.Errorf("#%d: error creating request: %v", l.id, err)
204+
l.results.Errors++
205+
}
206+
207+
if err := conn.Close(); err != nil {
208+
logrus.Errorf("#%d: error closing connection: %v", l.id, err)
209+
}
210+
211+
pacingMs := l.opt.MaxPacing.Milliseconds()
212+
pacingDelta := l.opt.MaxPacing.Milliseconds() - l.opt.MinPacing.Milliseconds()
213+
if pacingDelta > 0 {
214+
pacingMs = (rand.Int63() % pacingDelta) + l.opt.MinPacing.Milliseconds()
215+
time.Sleep(time.Duration(pacingMs) * time.Millisecond)
216+
}
217+
218+
l.results.Loops++
219+
}
220+
}
221+
222+
func (l *PrivateHttpLooper) shutdown() {
223+
if l.listener != nil {
224+
if err := l.listener.Close(); err != nil {
225+
logrus.Errorf("#%d error closing listener: %v", l.id, err)
226+
}
227+
}
228+
229+
if err := sdk.DeleteAccess(l.root, l.acc); err != nil {
230+
logrus.Errorf("#%d error deleting access '%v': %v", l.id, l.acc.Token, err)
231+
}
232+
233+
if err := sdk.DeleteShare(l.root, l.shr); err != nil {
234+
logrus.Errorf("#%d error deleting share '%v': %v", l.id, l.shr.Token, err)
235+
}
236+
}

canary/publicHttpLooper.go

+12-14
Original file line numberDiff line numberDiff line change
@@ -42,25 +42,22 @@ func NewPublicHttpLooper(id uint, frontend string, opt *LooperOptions, root env_
4242
func (l *PublicHttpLooper) Run() {
4343
defer close(l.done)
4444
defer logrus.Infof("#%d stopping", l.id)
45+
defer l.shutdown()
4546
logrus.Infof("#%d starting", l.id)
4647

4748
if err := l.startup(); err != nil {
4849
logrus.Fatalf("#%d error starting: %v", l.id, err)
4950
}
5051

51-
if err := l.bindListener(); err != nil {
52-
logrus.Fatalf("#%d error binding listener: %v", l.id, err)
52+
if err := l.bind(); err != nil {
53+
logrus.Fatalf("#%d error binding: %v", l.id, err)
5354
}
5455

5556
l.dwell()
5657

5758
l.iterate()
5859

5960
logrus.Infof("#%d completed", l.id)
60-
61-
if err := l.shutdown(); err != nil {
62-
logrus.Fatalf("#%d: error shutting down: %v", l.id, err)
63-
}
6461
}
6562

6663
func (l *PublicHttpLooper) Abort() {
@@ -87,12 +84,13 @@ func (l *PublicHttpLooper) startup() error {
8784
return err
8885
}
8986
l.shr = shr
87+
9088
logrus.Infof("#%d allocated share '%v'", l.id, l.shr.Token)
9189

9290
return nil
9391
}
9492

95-
func (l *PublicHttpLooper) bindListener() error {
93+
func (l *PublicHttpLooper) bind() error {
9694
zif, err := l.root.ZitiIdentityNamed(l.root.EnvironmentIdentityName())
9795
if err != nil {
9896
return errors.Wrapf(err, "#%d error getting identity", l.id)
@@ -148,8 +146,9 @@ func (l *PublicHttpLooper) iterate() {
148146
}
149147

150148
payloadSize := l.opt.MaxPayload
151-
if l.opt.MaxPayload-l.opt.MinPayload > 0 {
152-
payloadSize = rand.Uint64() % (l.opt.MaxPayload - l.opt.MinPayload)
149+
payloadRange := l.opt.MaxPayload - l.opt.MinPayload
150+
if payloadRange > 0 {
151+
payloadSize = (rand.Uint64() % payloadRange) + l.opt.MinPayload
153152
}
154153
outPayload := make([]byte, payloadSize)
155154
cryptorand.Read(outPayload)
@@ -184,23 +183,22 @@ func (l *PublicHttpLooper) iterate() {
184183
pacingMs := l.opt.MaxPacing.Milliseconds()
185184
pacingDelta := l.opt.MaxPacing.Milliseconds() - l.opt.MinPacing.Milliseconds()
186185
if pacingDelta > 0 {
187-
pacingMs = rand.Int63() % pacingDelta
186+
pacingMs = (rand.Int63() % pacingDelta) + l.opt.MinPacing.Milliseconds()
188187
time.Sleep(time.Duration(pacingMs) * time.Millisecond)
189188
}
189+
190190
l.results.Loops++
191191
}
192192
}
193193

194-
func (l *PublicHttpLooper) shutdown() error {
194+
func (l *PublicHttpLooper) shutdown() {
195195
if l.listener != nil {
196196
if err := l.listener.Close(); err != nil {
197197
logrus.Errorf("#%d error closing listener: %v", l.id, err)
198198
}
199199
}
200200

201201
if err := sdk.DeleteShare(l.root, l.shr); err != nil {
202-
return errors.Wrapf(err, "#%d error deleting share", l.id)
202+
logrus.Errorf("#%d error deleting share '%v': %v", l.id, l.shr.Token, err)
203203
}
204-
205-
return nil
206204
}

0 commit comments

Comments
 (0)