-
Notifications
You must be signed in to change notification settings - Fork 6
/
statestore_test.go
108 lines (99 loc) · 2.47 KB
/
statestore_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package kinesumer
import (
"context"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/guregu/dynamo"
"github.com/stretchr/testify/assert"
"testing"
"time"
)
func newTestDynamoDB(t *testing.T) *dynamo.DB {
awsCfg := aws.NewConfig()
awsCfg.WithRegion("ap-northeast-2")
awsCfg.WithEndpoint("http://localhost:14566")
sess, err := session.NewSession(awsCfg)
if err != nil {
t.Fatal("failed to init test env:", err.Error())
}
return dynamo.New(sess)
}
func cleanUpStateStore(t *testing.T, store *stateStore) {
type PkSk struct {
PK string `dynamo:"pk"`
SK string `dynamo:"sk"`
}
var (
pksks []*PkSk
keys []dynamo.Keyed
)
table := store.db.table
if err := table.Scan().All(&pksks); err != nil {
t.Fatal("failed to scan the state table:", err.Error())
}
for _, pksk := range pksks {
keys = append(keys, &dynamo.Keys{pksk.PK, pksk.SK})
}
if _, err := table.
Batch("pk", "sk").
Write().
Delete(keys...).
Run(); err != nil {
t.Fatal("failed to delete all test data:", err.Error())
}
}
func TestStateStore_UpdateCheckPointsWorksFine(t *testing.T) {
cfg := &Config{
App: "test",
DynamoDBRegion: "ap-northeast-2",
DynamoDBTable: "kinesumer-state-store",
DynamoDBEndpoint: "http://localhost:14566",
}
store, err := newStateStore(cfg)
assert.NoError(t, err, "there should be no error")
s, _ := store.(*stateStore)
defer cleanUpStateStore(t, s)
expectedUpdatedAt := time.Date(2022, 7, 12, 12, 35, 0, 0, time.UTC)
expected := []*stateCheckPoint{
{
StreamKey: buildCheckPointKey("test", "foobar"),
ShardID: "shardId-000",
SequenceNumber: "0",
LastUpdate: expectedUpdatedAt,
},
}
err = s.UpdateCheckPoints(
context.Background(),
[]*ShardCheckPoint{
{
Stream: "foobar",
ShardID: "shardId-000",
SequenceNumber: "0",
UpdatedAt: expectedUpdatedAt,
},
},
)
if assert.NoError(t, err, "there should be no error") {
assert.Eventually(
t,
func() bool {
var result []*stateCheckPoint
err := s.db.table.
Batch("pk", "sk").
Get(
[]dynamo.Keyed{
dynamo.Keys{buildCheckPointKey("test", "foobar"), "shardId-000"},
dynamo.Keys{buildCheckPointKey("test", "foo"), "shardId-001"},
}...,
).All(&result)
if assert.NoError(t, err) {
return assert.EqualValues(t, expected, result)
}
return false
},
600*time.Millisecond,
100*time.Millisecond,
"they should be equal",
)
}
}