Skip to content

Commit fb84484

Browse files
committed
feat: implement session migration with GoAway message handling
1 parent 0f2404e commit fb84484

File tree

8 files changed

+146
-22
lines changed

8 files changed

+146
-22
lines changed

lib/transport/buffer.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ export class ImmutableBytesBuffer {
143143

144144
getUtf8String(): string {
145145
const len = this.getNumberVarInt()
146+
if (len === 0) return ""
146147
const val = this.getBytes(len)
147148
return new TextDecoder().decode(val)
148149
}

lib/transport/client.ts

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,20 @@ export class Client {
2626
})
2727
}
2828

29-
async connect(): Promise<Connection> {
29+
async createQuic(sessionUri?: string): Promise<WebTransport> {
3030
const options: WebTransportOptions = {}
3131

3232
const fingerprint = await this.#fingerprint
3333
if (fingerprint) options.serverCertificateHashes = [fingerprint]
3434

35-
const quic = new WebTransport(this.config.url, options)
35+
const uri = sessionUri ?? this.config.url
36+
const quic = new WebTransport(uri, options)
3637
await quic.ready
38+
return quic
39+
}
3740

41+
async prepareConnection(quic: WebTransport): Promise<{ control: Stream.ControlStream, objects: Objects }> {
3842
const stream = await quic.createBidirectionalStream({ sendOrder: Number.MAX_SAFE_INTEGER })
39-
4043
const buffer = new ReadableWritableStreamBuffer(stream.readable, stream.writable)
4144

4245
const msg: Control.ClientSetup = {
@@ -57,7 +60,21 @@ export class Client {
5760
const control = new Stream.ControlStream(buffer)
5861
const objects = new Objects(quic)
5962

60-
return new Connection(quic, control, objects)
63+
return { control, objects }
64+
}
65+
66+
async migrateSession(sessionUri?: string): Promise<{ quic: WebTransport, control: Stream.ControlStream, objects: Objects }> {
67+
const quic = await this.createQuic(sessionUri)
68+
const { control, objects } = await this.prepareConnection(quic)
69+
return { quic, control, objects }
70+
}
71+
72+
async connect(sessionUri?: string): Promise<Connection> {
73+
const quic = await this.createQuic(sessionUri)
74+
const { control, objects } = await this.prepareConnection(quic)
75+
const conn = new Connection(quic, control, objects)
76+
conn.onMigration = this.migrateSession.bind(this)
77+
return conn
6178
}
6279

6380
async #fetchFingerprint(url?: string): Promise<WebTransportHash | undefined> {

lib/transport/connection.ts

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ import { ControlStream } from "./stream"
66
import { Publisher } from "./publisher"
77
import { Subscriber } from "./subscriber"
88

9+
export type MigrationState = "none" | "in_progress" | "done"
10+
911
export class Connection {
12+
#migrationState: MigrationState = "none"
1013
// The established WebTransport session.
1114
#quic: WebTransport
1215

@@ -36,6 +39,11 @@ export class Connection {
3639
this.#running = this.#run()
3740
}
3841

42+
// Callback, should be set when creating connection in the client
43+
onMigration: (sessionUri?: string) => Promise<{ quic: WebTransport, control: ControlStream, objects: Objects }> = async () => {
44+
throw new Error("not implemented")
45+
}
46+
3947
close(code = 0, reason = "") {
4048
this.#quic.close({ closeCode: code, reason })
4149
}
@@ -95,13 +103,43 @@ export class Connection {
95103
}
96104

97105
async #recv(msg: Control.MessageWithType) {
98-
if (Control.isPublisher(msg.type)) {
106+
if (msg.type === Control.ControlMessageType.GoAway) {
107+
await this.#handleGoAway(msg.message)
108+
} else if (Control.isPublisher(msg.type)) {
99109
await this.#subscriber.recv(msg)
100110
} else {
101111
await this.#publisher.recv(msg)
102112
}
103113
}
104114

115+
async #handleGoAway(msg: Control.GoAway) {
116+
console.log("preparing for migration, got go_away message:", msg)
117+
if (this.#migrationState === "in_progress") {
118+
throw new Error("go away received twice")
119+
}
120+
this.#migrationState = "in_progress"
121+
await this.#subscriber.startMigration()
122+
await this.#publisher.startMigration()
123+
124+
// FIXME(itzmanish): is this how we close the quic connection for go_away?
125+
this.#quic.close({ closeCode: 0, reason: "going_away" })
126+
127+
const { quic, control, objects } = await this.onMigration(msg.session_uri)
128+
this.#quic = quic
129+
this.#controlStream = control
130+
this.#objects = objects
131+
this.#migrationState = "done"
132+
133+
await this.#publisher.migrationDone(control, objects)
134+
await this.#subscriber.migrationDone(control, objects)
135+
}
136+
137+
async migrateSession(quic: WebTransport, control: ControlStream, objects: Objects) {
138+
this.#quic = quic
139+
this.#controlStream = control
140+
this.#objects = objects
141+
}
142+
105143
async closed(): Promise<Error> {
106144
try {
107145
await this.#running

lib/transport/control/go_away.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ export namespace GoAway {
2121

2222
export function deserialize(reader: ImmutableBytesBuffer): GoAway {
2323
const session_uri = reader.getUtf8String()
24+
if (session_uri.length > 8192) {
25+
// TODO(itzmanish): should be PROTOCOL_ERROR
26+
throw new Error("session_uri too long")
27+
}
2428
return {
2529
session_uri
2630
}

lib/transport/control/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ type MessageWithType =
5959
| { type: ControlMessageType.SubscribeNamespaceOk; message: SubscribeNamespaceOk }
6060
| { type: ControlMessageType.SubscribeNamespaceError; message: SubscribeNamespaceError }
6161
| { type: ControlMessageType.Unsubscribe; message: Unsubscribe }
62+
| { type: ControlMessageType.GoAway; message: GoAway }
6263

6364
type Message = Subscriber | Publisher
6465

lib/transport/publisher.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@ import { ControlStream } from "./stream"
33
import { Queue, Watch } from "../common/async"
44
import { Objects, TrackWriter, ObjectDatagramType } from "./objects"
55
import { SubgroupType, SubgroupWriter } from "./subgroup"
6+
import { MigrationState } from "./connection"
67

78
export class Publisher {
89
// Used to send control messages
910
#control: ControlStream
1011

12+
#migrationState: MigrationState = "none"
13+
1114
// Use to send objects.
1215
#objects: Objects
1316

@@ -27,7 +30,21 @@ export class Publisher {
2730
this.#objects = objects
2831
}
2932

33+
async startMigration() {
34+
this.#migrationState = "in_progress"
35+
}
36+
37+
async migrationDone(control: ControlStream, objects: Objects) {
38+
this.#migrationState = "done"
39+
this.#control = control
40+
this.#objects = objects
41+
// NOTE(itzmanish): should we republish all the tracks?
42+
}
43+
3044
async publish_namespace(namespace: string[]): Promise<PublishNamespaceSend> {
45+
if (this.#migrationState === "in_progress") {
46+
throw new Error(`migration in progress`)
47+
}
3148
if (this.#publishedNamespaces.has(namespace.join("/"))) {
3249
throw new Error(`already announced: ${namespace.join("/")}`)
3350
}
@@ -103,6 +120,9 @@ export class Publisher {
103120
}
104121

105122
async recvSubscribe(msg: Control.Subscribe) {
123+
if (this.#migrationState === "in_progress") {
124+
throw new Error(`migration in progress`)
125+
}
106126
try {
107127
if (this.#subscribe.has(msg.id)) {
108128
throw new Error(`duplicate subscribe for id: ${msg.id}`)

lib/transport/stream.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
Subscribe, SubscribeOk, SubscribeError,
1010
SubscribeUpdate, SubscribeNamespace,
1111
SubscribeNamespaceOk, SubscribeNamespaceError,
12+
GoAway,
1213
} from "./control"
1314
import { debug } from "./utils"
1415
import { ImmutableBytesBuffer, ReadableWritableStreamBuffer, Reader, Writer } from "./buffer"
@@ -94,6 +95,12 @@ export class Decoder {
9495

9596
let res: MessageWithType
9697
switch (t) {
98+
case ControlMessageType.GoAway:
99+
res = {
100+
type: t,
101+
message: GoAway.deserialize(payload),
102+
}
103+
break
97104
case ControlMessageType.Subscribe:
98105
res = {
99106
type: t,

lib/transport/subscriber.ts

Lines changed: 53 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import type { TrackReader } from "./objects"
55
import { debug } from "./utils"
66
import { ControlStream } from "./stream"
77
import { SubgroupReader } from "./subgroup"
8+
import { MigrationState } from "./connection"
89

910
export interface TrackInfo {
1011
track_alias: bigint
@@ -15,6 +16,9 @@ export class Subscriber {
1516
// Use to send control messages.
1617
#control: ControlStream
1718

19+
#migrationState: MigrationState = "none"
20+
#tracksToMigrate = new Set<[string[], string]>()
21+
1822
// Use to send objects.
1923
#objects: Objects
2024

@@ -38,6 +42,24 @@ export class Subscriber {
3842
return this.#publishedNamespacesQueue
3943
}
4044

45+
async startMigration() {
46+
this.#migrationState = "in_progress"
47+
// close all the subscription
48+
this.#trackToIDMap.forEach(async (id, track) => {
49+
await this.unsubscribe(track, true);
50+
})
51+
}
52+
53+
async migrationDone(control: ControlStream, objects: Objects) {
54+
this.#migrationState = "done"
55+
this.#control = control
56+
this.#objects = objects
57+
this.#tracksToMigrate.forEach(async (track) => {
58+
await this.subscribe(track[0], track[1]);
59+
})
60+
this.#tracksToMigrate.clear()
61+
}
62+
4163
async recv(msg: Control.MessageWithType) {
4264
const { type, message } = msg;
4365
switch (type) {
@@ -82,8 +104,10 @@ export class Subscriber {
82104
}
83105

84106
async subscribe_namespace(namespace: string[]) {
107+
if (this.#migrationState === "in_progress") {
108+
throw new Error(`migration in progress`)
109+
}
85110
const id = this.#control.nextRequestId()
86-
// TODO(itzmanish): implement this
87111
const msg: Control.MessageWithType = {
88112
type: Control.ControlMessageType.SubscribeNamespace,
89113
message: {
@@ -95,6 +119,9 @@ export class Subscriber {
95119
}
96120

97121
async subscribe(namespace: string[], track: string) {
122+
if (this.#migrationState === "in_progress") {
123+
throw new Error(`migration in progress`)
124+
}
98125
const id = this.#control.nextRequestId()
99126

100127
const subscribe = new SubscribeSend(this.#control, id, namespace, track)
@@ -122,22 +149,29 @@ export class Subscriber {
122149
return subscribe
123150
}
124151

125-
async unsubscribe(track: string) {
126-
if (this.#trackToIDMap.has(track)) {
127-
const trackID = this.#trackToIDMap.get(track)
128-
if (trackID === undefined) {
129-
console.warn(`Exception track ${track} not found in trackToIDMap.`)
130-
return
131-
}
132-
try {
133-
await this.#control.send({ type: Control.ControlMessageType.Unsubscribe, message: { id: trackID } })
134-
this.#trackToIDMap.delete(track)
135-
} catch (error) {
136-
console.error(`Failed to unsubscribe from track ${track}:`, error)
152+
async unsubscribe(track: string, isMigrating = false) {
153+
const trackID = this.#trackToIDMap.get(track)
154+
155+
if (trackID === undefined) {
156+
console.warn(`Exception track ${track} not found in trackToIDMap.`)
157+
return
158+
}
159+
160+
try {
161+
const subscription = this.#subscribe.get(trackID)
162+
if (subscription) {
163+
this.#subscribe.delete(trackID)
164+
await subscription.close()
165+
if (isMigrating) {
166+
this.#tracksToMigrate.add([subscription.namespace, track])
167+
}
137168
}
138-
} else {
139-
console.warn(`During unsubscribe request initiation attempt track ${track} not found in trackToIDMap.`)
169+
this.#trackToIDMap.delete(track)
170+
171+
} catch (error) {
172+
console.error(`Failed to unsubscribe from track ${track}:`, error)
140173
}
174+
141175
}
142176

143177
recvSubscribeOk(msg: Control.SubscribeOk) {
@@ -265,8 +299,10 @@ export class SubscribeSend {
265299
}
266300

267301
async close(_code = 0n, _reason = "") {
268-
// TODO implement unsubscribe
269-
// await this.#inner.sendReset(code, reason)
302+
this.#control.send({
303+
type: Control.ControlMessageType.Unsubscribe,
304+
message: { id: this.#id }
305+
})
270306
}
271307

272308
onOk(trackAlias: bigint) {

0 commit comments

Comments
 (0)