Skip to content

Commit

Permalink
fold on-demand edge sync message
Browse files Browse the repository at this point in the history
  • Loading branch information
gertd committed Jul 18, 2024
1 parent d2af71c commit a5bb02c
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 18 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ require (
github.com/aserto-dev/go-directory v0.31.5
github.com/aserto-dev/go-directory-cli v0.31.3-0.20240711215128-477b23ffcb75
github.com/aserto-dev/go-edge-ds v0.32.1
github.com/aserto-dev/go-grpc v0.8.65
github.com/aserto-dev/go-grpc v0.8.67
github.com/aserto-dev/go-topaz-ui v0.1.10
github.com/aserto-dev/header v0.0.7
github.com/aserto-dev/logger v0.0.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,8 @@ github.com/aserto-dev/go-directory-cli v0.31.3-0.20240711215128-477b23ffcb75 h1:
github.com/aserto-dev/go-directory-cli v0.31.3-0.20240711215128-477b23ffcb75/go.mod h1:/y3uE2GDZDDJktgWlOzBLhSiZO7kN3Lri31aOPElbPs=
github.com/aserto-dev/go-edge-ds v0.32.1 h1:4kwC6OvVGD+DYtlRwq4eicYBEqn1YKj5S0RatozHm50=
github.com/aserto-dev/go-edge-ds v0.32.1/go.mod h1:nxGsXHP+JzV6ubKoT9XW2L0MIyzC2Dpc7a9adxjXFR0=
github.com/aserto-dev/go-grpc v0.8.65 h1:qj84Ps9nkjAk7SzS+53jrd9PGgmq3ALq8pad2WzYXDY=
github.com/aserto-dev/go-grpc v0.8.65/go.mod h1:ezNdiKzZZCBwpANK6TYNKwe3qTbNFwZB7OEl+v2d/Gw=
github.com/aserto-dev/go-grpc v0.8.67 h1:6DGZiIjIWWFMQ81bLswTqbuQLWS5zycbAQIJsyPmftA=
github.com/aserto-dev/go-grpc v0.8.67/go.mod h1:ezNdiKzZZCBwpANK6TYNKwe3qTbNFwZB7OEl+v2d/Gw=
github.com/aserto-dev/go-topaz-ui v0.1.10 h1:EXgWMOoGvoyTHCCDhXGeCw/W0ABxFlsyka+A1dk/OnQ=
github.com/aserto-dev/go-topaz-ui v0.1.10/go.mod h1:BdW4vOhuG0HkjNH4L4omcNa/RJJ4LaxFzbw92qc+pc8=
github.com/aserto-dev/header v0.0.7 h1:hlo5/zYsBOsxzPxtve7LRbXyBbQmKSPAyOfmPhGgirM=
Expand Down
2 changes: 1 addition & 1 deletion pkg/app/management/command_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func HandleCommand(ctx context.Context, cmd *api.Command, r *runtime.Runtime) er
if !ok {
return errors.Errorf("failed to cast discovery plugin")
}

r.Logger.Warn().Str("modes", edge.PrintMode(msg.SyncEdgeDirectory.Mode)).Msg("COMMAND HANDLER")
edgePlugin.SyncNow(msg.SyncEdgeDirectory.Mode)

default:
Expand Down
79 changes: 65 additions & 14 deletions plugins/edge/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package edge
import (
"context"
"strings"
"sync/atomic"
"time"

"github.com/aserto-dev/go-aserto/client"
Expand Down Expand Up @@ -148,7 +149,11 @@ func (p *Plugin) scheduler() {
interval := time.NewTicker(1 * time.Second)
defer interval.Stop()

var running atomic.Bool
running.Store(false)

cycle := cycles
syncMode := api.SyncMode_SYNC_MODE_UNKNOWN

for {
select {
Expand All @@ -160,32 +165,50 @@ func (p *Plugin) scheduler() {
p.logger.Info().Time("dispatch", t).Msg(syncScheduler)
interval.Stop()

p.task(api.SyncMode_SYNC_MODE_WATERMARK) // watermark sync
syncMode = api.SyncMode_SYNC_MODE_WATERMARK

if cycle%cycles == 0 {
p.task(api.SyncMode_SYNC_MODE_DIFF)
syncMode = api.SyncMode_SYNC_MODE_DIFF
cycle = 0
}
cycle++
p.logger.Warn().Str("modes", PrintMode(syncMode)).Msg("INTERVAL")

case mode := <-p.syncNow:
p.logger.Warn().Time("dispatch", time.Now()).Msg(syncOnDemand)
interval.Stop()

p.task(mode)
syncMode = Fold(syncMode, mode)
p.logger.Warn().Str("modes", PrintMode(syncMode)).Msg("ON-DEMAND")
}

// calculate the interval in secs
//
// p.config.SyncInterval 1m-60m
// 1m -> 60s -> 15s interval
// 5m -> 300s -> 75s interval
// 60m -> 3600s -> 900s interval
waitInSec := (p.config.SyncInterval * 60) / cycles

wait := time.Duration(waitInSec) * time.Second
interval.Reset(wait)
p.logger.Info().Str("interval", wait.String()).Time("next-run", time.Now().Add(wait)).Msg(syncScheduler)
if !running.Load() {
runMode := syncMode
syncMode = api.SyncMode_SYNC_MODE_UNKNOWN
go func() {
p.logger.Warn().Str("modes", PrintMode(runMode)).Msg("TASK STARTED RUNNING")

running.Store(true)
defer func() {
p.logger.Warn().Str("modes", PrintMode(runMode)).Msg("TASK STOPPED RUNNING")
running.Store(false)
}()

p.task(runMode)

// calculate the interval in secs
//
// p.config.SyncInterval 1m-60m
// 1m -> 60s -> 15s interval
// 5m -> 300s -> 75s interval
// 60m -> 3600s -> 900s interval
waitInSec := (p.config.SyncInterval * 60) / cycles

wait := time.Duration(waitInSec) * time.Second
interval.Reset(wait)
p.logger.Info().Str("interval", wait.String()).Time("next-run", time.Now().Add(wait)).Msg(syncScheduler)
}()
}
}
}

Expand Down Expand Up @@ -268,3 +291,31 @@ func (p *Plugin) remoteDirectoryClient(ctx context.Context) (*grpc.ClientConn, e

return conn, nil
}

func Fold(m ...api.SyncMode) api.SyncMode {
r := api.SyncMode_SYNC_MODE_UNKNOWN
for _, v := range m {
r |= v
}
return r
}

func PrintMode(mode api.SyncMode) string {
modes := []string{}
if mode&api.SyncMode_SYNC_MODE_MANIFEST != 0 {
modes = append(modes, "MANIFEST")
}
if mode&api.SyncMode_SYNC_MODE_FULL != 0 {
modes = append(modes, "FULL")
}
if mode&api.SyncMode_SYNC_MODE_DIFF != 0 {
modes = append(modes, "DIFF")
}
if mode&api.SyncMode_SYNC_MODE_WATERMARK != 0 {
modes = append(modes, "WATERMARK")
}
if mode == api.SyncMode_SYNC_MODE_UNKNOWN {
modes = append(modes, "UNKNOWN")
}
return strings.Join(modes, " | ")
}
34 changes: 34 additions & 0 deletions plugins/edge/plugin_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package edge_test

import (
"testing"

"github.com/aserto-dev/go-grpc/aserto/api/v2"
"github.com/aserto-dev/topaz/plugins/edge"
"github.com/stretchr/testify/assert"
)

func TestFold(t *testing.T) {
t.Log("TestFold")
assert.Equal(t, api.SyncMode(0), edge.Fold(api.SyncMode_SYNC_MODE_UNKNOWN))
assert.Equal(t, api.SyncMode(0), edge.Fold(api.SyncMode_SYNC_MODE_UNKNOWN, api.SyncMode_SYNC_MODE_UNKNOWN))
assert.Equal(t, api.SyncMode(8), edge.Fold(api.SyncMode_SYNC_MODE_MANIFEST, api.SyncMode_SYNC_MODE_MANIFEST))
assert.Equal(t, api.SyncMode(1), edge.Fold(api.SyncMode_SYNC_MODE_UNKNOWN, api.SyncMode_SYNC_MODE_FULL))
assert.Equal(t, api.SyncMode(2), edge.Fold(api.SyncMode_SYNC_MODE_DIFF))
assert.Equal(t, api.SyncMode(3), edge.Fold(api.SyncMode_SYNC_MODE_FULL, api.SyncMode_SYNC_MODE_DIFF))
assert.Equal(t, api.SyncMode(7), edge.Fold(api.SyncMode_SYNC_MODE_FULL, api.SyncMode_SYNC_MODE_DIFF, api.SyncMode_SYNC_MODE_WATERMARK))
assert.Equal(t, api.SyncMode(15), edge.Fold(api.SyncMode_SYNC_MODE_FULL, api.SyncMode_SYNC_MODE_DIFF, api.SyncMode_SYNC_MODE_WATERMARK, api.SyncMode_SYNC_MODE_MANIFEST))
all1 := edge.Fold(api.SyncMode_SYNC_MODE_FULL, api.SyncMode_SYNC_MODE_DIFF, api.SyncMode_SYNC_MODE_WATERMARK, api.SyncMode_SYNC_MODE_MANIFEST)
all2 := edge.Fold(api.SyncMode_SYNC_MODE_FULL, api.SyncMode_SYNC_MODE_DIFF, api.SyncMode_SYNC_MODE_WATERMARK, api.SyncMode_SYNC_MODE_MANIFEST)
assert.Equal(t, api.SyncMode(15), edge.Fold(all1, all2))
assert.Equal(t, api.SyncMode(4), edge.Fold(api.SyncMode_SYNC_MODE_WATERMARK, api.SyncMode_SYNC_MODE_WATERMARK, api.SyncMode_SYNC_MODE_WATERMARK))

req := &api.Command_SyncEdgeDirectory{
SyncEdgeDirectory: &api.SyncEdgeDirectoryCommand{
Mode: api.SyncMode_SYNC_MODE_UNKNOWN,
},
}
assert.Equal(t, api.SyncMode(0), edge.Fold(req.SyncEdgeDirectory.Mode))

t.Log(edge.PrintMode(api.SyncMode_SYNC_MODE_UNKNOWN))
}

0 comments on commit a5bb02c

Please sign in to comment.