Skip to content

Commit b0b9a48

Browse files
committed
consumer: Apply RPC takes optional hints to set on upsert
Certain migration use-cases require that shards new to a cluster also have pre-existing FSM hints available for their recovery log. Piggy-back optional FSM hints onto the Apply RPC which creates the shard, so that they're created in the same Etcd transaction.
1 parent d01f364 commit b0b9a48

File tree

6 files changed

+231
-130
lines changed

6 files changed

+231
-130
lines changed

consumer/protocol/protocol.pb.go

Lines changed: 182 additions & 124 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

consumer/protocol/protocol.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,11 @@ message ApplyRequest {
358358
ShardSpec upsert = 2;
359359
// Shard to be deleted. expect_mod_revision must not be zero.
360360
string delete = 3 [ (gogoproto.casttype) = "ShardID" ];
361+
// (Optional) Primary hints to set for the shard. This is rarely required,
362+
// as shards start with an empty recovery log and track their own hints,
363+
// but is useful in some advanced migration scenarios.
364+
// If set, then this Change must be an upsert.
365+
recoverylog.FSMHints primary_hints = 4;
361366
}
362367
repeated Change changes = 1 [ (gogoproto.nullable) = false ];
363368
// Optional extension of the ApplyRequest.

consumer/protocol/rpc_extensions.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,19 @@ func (m *ApplyRequest_Change) Validate() error {
9797
} else if m.ExpectModRevision < 0 && (m.ExpectModRevision != -1) {
9898
return pb.NewValidationError("invalid ExpectModRevision (%d; expected >= 0 or -1)", m.ExpectModRevision)
9999
}
100+
101+
if m.PrimaryHints == nil {
102+
// Pass.
103+
} else if err := m.PrimaryHints.Validate(); err != nil {
104+
return pb.ExtendContext(err, "PrimaryHints")
105+
}
100106
} else if m.Delete != "" {
101107
if err := m.Delete.Validate(); err != nil {
102108
return pb.ExtendContext(err, "Delete")
103109
} else if m.ExpectModRevision <= 0 && (m.ExpectModRevision != -1) {
104110
return pb.NewValidationError("invalid ExpectModRevision (%d; expected > 0 or -1)", m.ExpectModRevision)
111+
} else if m.PrimaryHints != nil {
112+
return pb.NewValidationError("hints may be set only with an upsert, not a delete")
105113
}
106114
} else {
107115
return pb.NewValidationError("neither Upsert nor Delete are set (expected exactly one)")

consumer/protocol/rpc_extensions_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package protocol
22

33
import (
44
pb "go.gazette.dev/core/broker/protocol"
5+
"go.gazette.dev/core/consumer/recoverylog"
56
gc "gopkg.in/check.v1"
67
)
78

@@ -106,10 +107,18 @@ func (s *RPCSuite) TestApplyRequestValidationCases(c *gc.C) {
106107
MaxTxnDuration: 1,
107108
},
108109
Delete: "another-id",
110+
PrimaryHints: &recoverylog.FSMHints{
111+
Log: "valid",
112+
LiveNodes: []recoverylog.FnodeSegments{{
113+
Fnode: 32,
114+
Segments: []recoverylog.Segment{{Author: 0, FirstSeqNo: 1, LastSeqNo: 2}},
115+
}},
116+
},
109117
},
110118
{
111119
ExpectModRevision: 0,
112120
Delete: "another invalid id",
121+
PrimaryHints: &recoverylog.FSMHints{Log: "invalid"},
113122
},
114123
{
115124
ExpectModRevision: 1,
@@ -123,10 +132,14 @@ func (s *RPCSuite) TestApplyRequestValidationCases(c *gc.C) {
123132
req.Changes[0].Upsert.Id = "a-valid-id"
124133
c.Check(req.Validate(), gc.ErrorMatches, `Changes\[0\]: invalid ExpectModRevision \(-2; expected >= 0 or -1\)`)
125134
req.Changes[0].ExpectModRevision = 0
135+
c.Check(req.Validate(), gc.ErrorMatches, `Changes\[0\].PrimaryHints.Segment: Author is zero`)
136+
req.Changes[0].PrimaryHints.LiveNodes[0].Segments[0].Author = 1
126137
c.Check(req.Validate(), gc.ErrorMatches, `Changes\[1\].Delete: not a valid token \(another invalid id\)`)
127138
req.Changes[1].Delete = "other-valid-id"
128139
c.Check(req.Validate(), gc.ErrorMatches, `Changes\[1\]: invalid ExpectModRevision \(0; expected > 0 or -1\)`)
129140
req.Changes[1].ExpectModRevision = 1
141+
c.Check(req.Validate(), gc.ErrorMatches, `Changes\[1\]: hints may be set only with an upsert, not a delete`)
142+
req.Changes[1].PrimaryHints = nil
130143
c.Check(req.Validate(), gc.ErrorMatches, `Changes\[2\]: neither Upsert nor Delete are set \(expected exactly one\)`)
131144
req.Changes[2].Delete = "yet-another-valid-id"
132145

consumer/shard_api.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package consumer
22

33
import (
44
"context"
5+
"encoding/json"
56
"fmt"
67
"strings"
78

@@ -126,6 +127,14 @@ func ShardApply(ctx context.Context, claims pb.Claims, srv *Service, req *pc.App
126127
}
127128
key = allocator.ItemKey(s.KS, change.Upsert.Id.String())
128129
ops = append(ops, clientv3.OpPut(key, change.Upsert.MarshalString()))
130+
131+
if change.PrimaryHints != nil {
132+
var val, err = json.Marshal(change.PrimaryHints)
133+
if err != nil {
134+
return nil, errors.WithMessage(err, "json.Marshal(hints)")
135+
}
136+
ops = append(ops, clientv3.OpPut(change.Upsert.HintPrimaryKey(), string(val)))
137+
}
129138
} else {
130139
if err := authorizeShard(&claims, change.Delete); err != nil {
131140
return nil, err

consumer/shard_api_test.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -145,11 +145,12 @@ func TestAPIApplyCases(t *testing.T) {
145145
var tf, cleanup = newTestFixture(t)
146146
defer cleanup()
147147

148+
var ctx = context.Background()
148149
var specA = makeShard(shardA)
149150
var specB = makeShard(shardB)
150151

151152
var verifyAndFetchRev = func(id pc.ShardID, expect pc.ShardSpec) int64 {
152-
var resp, err = tf.service.List(context.Background(), allClaims, &pc.ListRequest{
153+
var resp, err = tf.service.List(ctx, allClaims, &pc.ListRequest{
153154
Selector: pb.LabelSelector{Include: pb.MustLabelSet("id", id.String())},
154155
})
155156
require.NoError(t, err)
@@ -158,7 +159,7 @@ func TestAPIApplyCases(t *testing.T) {
158159
return resp.Shards[0].ModRevision
159160
}
160161
var apply = func(req *pc.ApplyRequest) *pc.ApplyResponse {
161-
var resp, err = tf.service.Apply(context.Background(), allClaims, req)
162+
var resp, err = tf.service.Apply(ctx, allClaims, req)
162163
require.NoError(t, err)
163164
return resp
164165
}
@@ -167,10 +168,17 @@ func TestAPIApplyCases(t *testing.T) {
167168
require.Equal(t, pc.Status_OK, apply(&pc.ApplyRequest{
168169
Changes: []pc.ApplyRequest_Change{
169170
{Upsert: specA},
170-
{Upsert: specB},
171+
{Upsert: specB, PrimaryHints: &recoverylog.FSMHints{
172+
Log: specB.RecoveryLog(),
173+
Properties: []recoverylog.Property{{Path: "hello", Content: "shard"}}}},
171174
},
172175
}).Status)
173176

177+
var hints, err = tf.service.GetHints(ctx, allClaims, &pc.GetHintsRequest{Shard: shardB})
178+
require.NoError(t, err)
179+
require.Equal(t, recoverylog.Property{Path: "hello", Content: "shard"},
180+
hints.PrimaryHints.Hints.Properties[0])
181+
174182
// Case: Update existing spec B.
175183
var origSpecB = *specB
176184
specB.Labels = append(specB.Labels, pb.Label{Name: "foo", Value: "bar"})
@@ -217,19 +225,19 @@ func TestAPIApplyCases(t *testing.T) {
217225
}).Status)
218226

219227
// Case: Insufficient claimed selector on delete.
220-
var _, err = tf.service.Apply(context.Background(), noClaims, &pc.ApplyRequest{
228+
_, err = tf.service.Apply(ctx, noClaims, &pc.ApplyRequest{
221229
Changes: []pc.ApplyRequest_Change{{Delete: "shard-A", ExpectModRevision: -1}},
222230
})
223231
require.EqualError(t, err, `rpc error: code = Unauthenticated desc = not authorized to shard-A`)
224232

225233
// Case: Insufficient claimed selector on upsert.
226-
_, err = tf.service.Apply(context.Background(), noClaims, &pc.ApplyRequest{
234+
_, err = tf.service.Apply(ctx, noClaims, &pc.ApplyRequest{
227235
Changes: []pc.ApplyRequest_Change{{Upsert: specB}},
228236
})
229237
require.EqualError(t, err, `rpc error: code = Unauthenticated desc = not authorized to shard-B`)
230238

231239
// Case: Invalid requests fail with an error.
232-
_, err = tf.service.Apply(context.Background(), allClaims, &pc.ApplyRequest{
240+
_, err = tf.service.Apply(ctx, allClaims, &pc.ApplyRequest{
233241
Changes: []pc.ApplyRequest_Change{{Delete: "invalid shard id"}},
234242
})
235243
require.EqualError(t, err, `Changes[0].Delete: not a valid token (invalid shard id)`)

0 commit comments

Comments
 (0)