Skip to content

Commit

Permalink
Add influxdb reporting for replication prober
Browse files Browse the repository at this point in the history
  • Loading branch information
p2004a committed Aug 21, 2023
1 parent 0a021f6 commit 35b1a32
Show file tree
Hide file tree
Showing 5 changed files with 255 additions and 14 deletions.
12 changes: 12 additions & 0 deletions cmd/replicationprober/envhelpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,15 @@ func getRequiredStrEnv(name string) string {
}
return envStr
}

func getBoolEnv(name string, defaultValue bool) bool {
envStr := os.Getenv(name)
if envStr == "" {
return defaultValue
}
envBool, err := strconv.ParseBool(envStr)
if err != nil {
log.Fatalf("Failed to parse %s: %v", name, err)
}
return envBool
}
27 changes: 26 additions & 1 deletion cmd/replicationprober/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,22 @@ import (
"log"
"net/http"
"os"
"sync"
"time"

"github.com/InfluxCommunity/influxdb3-go/influxdb3"
"github.com/p2004a/spring-rapid-syncer/pkg/bunny"
"github.com/p2004a/spring-rapid-syncer/pkg/sfcache"
)

const UserAgent = "spring-rapid-syncer/prober 1.0"

type Canary struct {
mu sync.Mutex
contents string
t time.Time
}

type Server struct {
http http.Client
bunny *bunny.Client
Expand All @@ -27,6 +35,9 @@ type Server struct {
storageEdgeMapCache sfcache.Cache[StorageEdgeMap]
versionsGzCache sfcache.Cache[[]*replicatedFile]
refreshReplicationCanaryPeriod time.Duration
checkReplicationStatusPeriod time.Duration
influxdbClient *influxdb3.Client
latestCanary Canary
}

func getExpectedStorageRegions(ctx context.Context, bunnyClient *bunny.Client, storageZone string) ([]string, error) {
Expand Down Expand Up @@ -55,6 +66,15 @@ func main() {
log.Fatalf("Failed to get expected storage regions: %v", err)
}

influxdbClient, err := influxdb3.New(influxdb3.ClientConfig{
Host: getRequiredStrEnv("INFLUXDB_URL"),
Token: getRequiredStrEnv("INFLUXDB_TOKEN"),
Database: getRequiredStrEnv("INFLUXDB_DATABASE"),
})
if err != nil {
log.Fatalf("Failed to create InfluxDB client: %v", err)
}

server := &Server{
http: http.Client{
Timeout: time.Second * 5,
Expand All @@ -71,13 +91,18 @@ func main() {
Timeout: getDurationEnv("VERSION_GZ_CACHE_DURATION", time.Second*10),
},
refreshReplicationCanaryPeriod: getDurationEnv("REFRESH_REPLICATION_CANARY_PERIOD", time.Minute*5),
checkReplicationStatusPeriod: getDurationEnv("CHECK_REPLICATION_STATUS_PERIOD", time.Minute*2),
influxdbClient: influxdbClient,
}

http.HandleFunc("/storageedgemap", server.HandleStorageEdgeMap)
http.HandleFunc("/replicationstatus_versionsgz", server.HandleReplicationStatusVersionsGz)
http.HandleFunc("/replicationstatus", server.HandleReplicationStatus)

go server.startCanaryUpdater(ctx)
if getBoolEnv("ENABLE_STORAGE_REPLICATION_PROBER", true) {
go server.startCanaryUpdater(ctx)
go server.startReplicationStatusChecker(ctx)
}

port := os.Getenv("PORT")
if port == "" {
Expand Down
159 changes: 146 additions & 13 deletions cmd/replicationprober/replicationcanary.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"
"time"

"github.com/InfluxCommunity/influxdb3-go/influxdb3"
"github.com/p2004a/spring-rapid-syncer/pkg/bunny"
"golang.org/x/sync/errgroup"
)
Expand All @@ -27,24 +28,118 @@ const canaryFileName = "replication-canary.txt"
func (s *Server) startCanaryUpdater(ctx context.Context) {
ticker := time.NewTicker(s.refreshReplicationCanaryPeriod)
for {
c, cancel := context.WithTimeout(ctx, s.refreshReplicationCanaryPeriod/2)
start := time.Now()
err := s.updateReplicationCanary(c)
cancel()
var points []*influxdb3.Point
if err != nil {
log.Printf("WARN: Failed to update replication canary: %v", err)
points = append(points,
influxdb3.NewPointWithMeasurement("replication_canary_update_result").
AddField("total", 1).
AddField("error", 1).
AddField("ok", 0).
SetTimestamp(time.Now()))
} else {
latency := time.Since(start)
points = append(points,
influxdb3.NewPointWithMeasurement("replication_canary_update_result").
AddField("total", 1).
AddField("error", 0).
AddField("ok", 1).
SetTimestamp(time.Now()),
influxdb3.NewPointWithMeasurement("replication_canary_update_latency").
AddField("latency", latency.Milliseconds()).
SetTimestamp(time.Now()))
}

c, cancel = context.WithTimeout(ctx, s.refreshReplicationCanaryPeriod/3)
if err := s.influxdbClient.WritePoints(c, points...); err != nil {
log.Printf("WARN: Failed to report replication_canary_update_latency to influxdb: %v", err)
}
cancel()

select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
c, cancel := context.WithTimeout(ctx, s.refreshReplicationCanaryPeriod/2)
s.updateReplicationCanary(c)
cancel()
}
}
}

func (s *Server) updateReplicationCanary(ctx context.Context) {
contents := strings.NewReader(time.Now().UTC().Format(time.RFC3339))
err := s.bunnyStorageZone.Upload(ctx, canaryFileName, contents)
func (s *Server) startReplicationStatusChecker(ctx context.Context) {
ticker := time.NewTicker(s.checkReplicationStatusPeriod)
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
}

c, cancel := context.WithTimeout(ctx, s.checkReplicationStatusPeriod/2)
start := time.Now()
rs, err := s.fetchReplicationStatus(c)
cancel()
var points []*influxdb3.Point
if err != nil {
log.Printf("WARN: Failed to fetch replication status: %v", err)
points = append(points,
influxdb3.NewPointWithMeasurement("replication_status_check_result").
AddField("total", 1).
AddField("error", 1).
AddField("ok", 0).
SetTimestamp(time.Now()))
} else {
latency := time.Since(start)

measurementTime := time.Now()

points = append(points,
influxdb3.NewPointWithMeasurement("replication_status_check_result").
AddField("total", 1).
AddField("error", 0).
AddField("ok", 1).
SetTimestamp(measurementTime),
influxdb3.NewPointWithMeasurement("replication_status_check_latency").
AddField("latency", latency.Milliseconds()).
SetTimestamp(measurementTime))

for _, r := range rs {
points = append(points,
influxdb3.NewPointWithMeasurement("replication_status_state").
AddTag("storage_server", r.StorageServer).
AddField("replicated", r.Replicated.Unix()).
AddField("created", r.Created.Unix()).
AddField("unsynced_for", r.UnsyncedFor.Seconds()).
SetTimestamp(measurementTime))
}
}

c, cancel = context.WithTimeout(ctx, s.checkReplicationStatusPeriod/3)
if err := s.influxdbClient.WritePoints(c, points...); err != nil {
log.Printf("WARN: Failed to report replication_status_check_latency to influxdb: %v", err)
}
cancel()
}
}

func (s *Server) updateReplicationCanary(ctx context.Context) error {
canary := time.Now().UTC()
contents := canary.Format(time.RFC3339)

err := s.bunnyStorageZone.Upload(ctx, canaryFileName, strings.NewReader(contents))
if err != nil {
log.Printf("ERROR: Failed to upload replication canary: %v", err)
return fmt.Errorf("failed to upload replication canary: %w", err)
}

s.latestCanary.mu.Lock()
s.latestCanary.t = canary
s.latestCanary.contents = contents
s.latestCanary.mu.Unlock()
return nil
}

func (s *Server) fetchReplicatedFileFromIP(ctx context.Context, ip, expectedSS string, filePath string) (*replicatedFile, error) {
Expand Down Expand Up @@ -116,25 +211,63 @@ func (s *Server) fetchReplicatedFile(ctx context.Context, filePath string) ([]*r
return allVersionsGz, nil
}

type ReplicationStatus struct {
StorageServer string
Replicated time.Time
Created time.Time
UnsyncedFor time.Duration
}

func (s *Server) fetchReplicationStatus(ctx context.Context) ([]ReplicationStatus, error) {
canaryFiles, err := s.fetchReplicatedFile(ctx, "/"+canaryFileName)
if err != nil {
return nil, fmt.Errorf("failed to fetch lastes versionsGz: %w", err)
}

s.latestCanary.mu.Lock()
contents := s.latestCanary.contents
canaryTime := s.latestCanary.t
s.latestCanary.mu.Unlock()

rs := make([]ReplicationStatus, len(canaryFiles))
sort.Sort(byServerName(canaryFiles))
for i, ver := range canaryFiles {
created, err := time.Parse(time.RFC3339, string(ver.contents))
if err != nil {
return nil, fmt.Errorf("failed to parse written time: %w", err)
}
var unsyncedFor time.Duration
if contents != "" && string(ver.contents) != contents && canaryTime.Before(created) {
unsyncedFor = time.Since(created.Add(s.refreshReplicationCanaryPeriod))
}
rs[i] = ReplicationStatus{
StorageServer: ver.storageServer,
Replicated: ver.lastModified,
Created: created,
UnsyncedFor: unsyncedFor,
}
}
return rs, nil
}

func (s *Server) HandleReplicationStatus(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
log.Printf("Got %s, not GET request for URL: %v", r.Method, r.URL)
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
return
}

canaryFiles, err := s.fetchReplicatedFile(r.Context(), "/"+canaryFileName)
rs, err := s.fetchReplicationStatus(r.Context())
if err != nil {
log.Printf("Failed to fetch lastes versionsGz: %v", err)
log.Printf("Failed to fetch replication status: %v", err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}

var out strings.Builder
sort.Sort(byServerName(canaryFiles))
for _, ver := range canaryFiles {
out.WriteString(fmt.Sprintf("%s: \n etag: %s\n last-modified: %s\n contents: %s\n",
ver.storageServer, ver.etag, ver.lastModified.Format(http.TimeFormat), string(ver.contents)))
for _, ver := range rs {
out.WriteString(fmt.Sprintf("%s: \n created: %s\n replicated: %s\n unsynced for: %s\n",
ver.StorageServer, ver.Created, ver.Replicated, ver.UnsyncedFor))
}
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusOK)
Expand Down
17 changes: 17 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,36 @@ require (
cloud.google.com/go/compute v1.23.0 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v1.1.2 // indirect
github.com/InfluxCommunity/influxdb3-go v0.2.0 // indirect
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/apache/arrow/go/v12 v12.0.1 // indirect
github.com/apache/thrift v0.16.0 // indirect
github.com/goccy/go-json v0.9.11 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v2.0.8+incompatible // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/s2a-go v0.1.5 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.5 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/influxdata/line-protocol/v2 v2.2.1 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/crypto v0.12.0 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/net v0.14.0 // indirect
golang.org/x/oauth2 v0.11.0 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/text v0.12.0 // indirect
golang.org/x/tools v0.6.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/api v0.137.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand Down
Loading

0 comments on commit 35b1a32

Please sign in to comment.