-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient.go
107 lines (81 loc) · 2.17 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package centrifuge
import (
"crypto/tls"
"sync"
"time"
"github.com/cenkalti/backoff/v4"
v1Client "github.com/roadrunner-server/api/v4/build/centrifugo/api/v1"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
// Will register via init
_ "google.golang.org/grpc/encoding/gzip"
)
type client struct {
mu sync.RWMutex
log *zap.Logger
addr string
tls *TLS
compress bool
centrifugoClient v1Client.CentrifugoApiClient
}
func newClient(addr string, tls *TLS, log *zap.Logger, compress bool) *client {
return &client{
addr: addr,
tls: tls,
compress: compress,
log: log,
}
}
func (c *client) connect() error {
c.mu.Lock()
defer c.mu.Unlock()
cb := backoff.NewConstantBackOff(time.Minute)
opts := make([]grpc.CallOption, 0, 1)
if c.compress {
opts = append(opts, grpc.UseCompressor("gzip"))
}
var cert tls.Certificate
var err error
if c.tls != nil {
cert, err = tls.LoadX509KeyPair(c.tls.Cert, c.tls.Key)
if err != nil {
return err
}
}
operation := func() error {
if c.tls != nil {
tlscfg := &tls.Config{
Certificates: []tls.Certificate{cert},
MinVersion: tls.VersionTLS12,
}
conn, err := grpc.NewClient(c.addr, grpc.WithDefaultCallOptions(opts...), grpc.WithTransportCredentials(credentials.NewTLS(tlscfg)))
if err != nil {
c.log.Debug("attempted to connect to the centrifugo server with TLS, retrying", zap.Error(err))
return err
}
c.centrifugoClient = v1Client.NewCentrifugoApiClient(conn)
return nil
}
// non-tls
conn, err := grpc.NewClient(c.addr, grpc.WithDefaultCallOptions(opts...), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
c.log.Debug("attempted to connect to the centrifugo server, retrying", zap.Error(err))
return err
}
c.centrifugoClient = v1Client.NewCentrifugoApiClient(conn)
return nil
}
err = backoff.Retry(operation, cb)
if err != nil {
return err
}
c.log.Debug("connected to the centrifugo server")
return nil
}
func (c *client) client() v1Client.CentrifugoApiClient {
c.mu.RLock()
defer c.mu.RUnlock()
return c.centrifugoClient
}