diff --git a/src/browser/sync/authentication_manager.ts b/src/browser/sync/authentication_manager.ts index b7da9ea..06b0622 100644 --- a/src/browser/sync/authentication_manager.ts +++ b/src/browser/sync/authentication_manager.ts @@ -7,6 +7,8 @@ import jwtDecode from "jwt-decode"; // schedule about 24 days in the future. const MAXIMUM_REFRESH_DELAY = 20 * 24 * 60 * 60 * 1000; // 20 days +const TOKEN_CONFIRMATION_RETRIES = 2; + /** * An async function returning the JWT-encoded OpenID Connect Identity Token * if available. @@ -92,6 +94,7 @@ export class AuthenticationManager { private readonly clearAuth: () => void; private readonly logger: Logger; private readonly refreshTokenLeewaySeconds: number; + private tokenConfirmationRetries = TOKEN_CONFIRMATION_RETRIES; constructor( syncState: LocalSyncState, callbacks: { @@ -138,8 +141,6 @@ export class AuthenticationManager { hasRetried: false, }); this.authenticate(token.value); - this._logVerbose("resuming WS after auth token fetch"); - this.resumeSocket(); } else { this.setAuthState({ state: "initialRefetch", @@ -148,6 +149,8 @@ export class AuthenticationManager { // Try again with `forceRefreshToken: true` await this.refetchToken(); } + this._logVerbose("resuming WS after auth token fetch"); + this.resumeSocket(); } onTransition(serverMessage: Transition) { @@ -176,6 +179,7 @@ export class AuthenticationManager { if (this.authState.state === "waitingForServerConfirmationOfFreshToken") { this._logVerbose("server confirmed new auth token is valid"); this.scheduleTokenRefetch(this.authState.token); + this.tokenConfirmationRetries = TOKEN_CONFIRMATION_RETRIES; if (!this.authState.hadAuth) { this.authState.config.onAuthChange(true); } @@ -183,6 +187,18 @@ export class AuthenticationManager { } onAuthError(serverMessage: AuthError) { + // If auth error comes from a query/mutation/action and the client + // is waiting for the server to confirm a token, ignore. + // TODO: This shouldn't rely on a specific error text, make less brittle. + // May require backend changes. + if ( + serverMessage.error === "Convex token identity expired" && + (this.authState.state === "waitingForServerConfirmationOfFreshToken" || + this.authState.state === "waitingForServerConfirmationOfCachedToken") + ) { + this._logVerbose("ignoring non-auth token expired error"); + return; + } const { baseVersion } = serverMessage; // Versioned AuthErrors are ignored if the client advanced to // a newer auth identity @@ -206,12 +222,13 @@ export class AuthenticationManager { // in that we pause the WebSocket so that mutations // don't retry with bad auth. private async tryToReauthenticate(serverMessage: AuthError) { - // We only retry once, to avoid infinite retries + this._logVerbose(`attempting to reauthenticate: ${serverMessage.error}`); if ( // No way to fetch another token, kaboom this.authState.state === "noAuth" || // We failed on a fresh token, trying another one won't help - this.authState.state === "waitingForServerConfirmationOfFreshToken" + (this.authState.state === "waitingForServerConfirmationOfFreshToken" && + this.tokenConfirmationRetries <= 0) ) { this.logger.error( `Failed to authenticate: "${serverMessage.error}", check your server auth config`, @@ -224,7 +241,13 @@ export class AuthenticationManager { } return; } - this._logVerbose("attempting to reauthenticate"); + if (this.authState.state === "waitingForServerConfirmationOfFreshToken") { + this.tokenConfirmationRetries--; + this._logVerbose( + `retrying reauthentication, ${this.tokenConfirmationRetries} retries remaining`, + ); + } + await this.stopSocket(); const token = await this.fetchTokenAndGuardAgainstRace( this.authState.config.fetchToken, @@ -296,12 +319,12 @@ export class AuthenticationManager { } this.setAndReportAuthFailed(this.authState.config.onAuthChange); } - // Resuming in case this refetch was triggered - // by an invalid cached token. + // Restart in case this refetch was triggered via schedule during + // a reauthentication attempt. this._logVerbose( - "resuming WS after auth token fetch (if currently paused)", + "restarting WS after auth token fetch (if currently stopped)", ); - this.resumeSocket(); + this.restartSocket(); } private scheduleTokenRefetch(token: string) { @@ -353,6 +376,7 @@ export class AuthenticationManager { delay = 0; } const refetchTokenTimeoutId = setTimeout(() => { + this._logVerbose("running scheduled token refetch"); void this.refetchToken(); }, delay); this.setAuthState({ @@ -374,9 +398,15 @@ export class AuthenticationManager { }, ) { const originalConfigVersion = ++this.configVersion; + this._logVerbose( + `fetching token with config version ${originalConfigVersion}`, + ); const token = await fetchToken(fetchArgs); if (this.configVersion !== originalConfigVersion) { // This is a stale config + this._logVerbose( + `stale config version, expected ${originalConfigVersion}, got ${this.configVersion}`, + ); return { isFromOutdatedConfig: true }; } return { isFromOutdatedConfig: false, value: token }; @@ -386,6 +416,7 @@ export class AuthenticationManager { this.resetAuthState(); // Bump this in case we are mid-token-fetch when we get stopped this.configVersion++; + this._logVerbose(`config version bumped to ${this.configVersion}`); } private setAndReportAuthFailed( @@ -400,6 +431,18 @@ export class AuthenticationManager { } private setAuthState(newAuth: AuthState) { + const authStateForLog = + newAuth.state === "waitingForServerConfirmationOfFreshToken" + ? { + hadAuth: newAuth.hadAuth, + state: newAuth.state, + token: `...${newAuth.token.slice(-7)}`, + } + : { state: newAuth.state }; + this._logVerbose( + `setting auth state to ${JSON.stringify(authStateForLog)}`, + ); + if (this.authState.state === "waitingForScheduledRefetch") { clearTimeout(this.authState.refetchTokenTimeoutId); diff --git a/src/browser/sync/web_socket_manager.ts b/src/browser/sync/web_socket_manager.ts index 029f24f..18550b7 100644 --- a/src/browser/sync/web_socket_manager.ts +++ b/src/browser/sync/web_socket_manager.ts @@ -158,6 +158,15 @@ export class WebSocketManager { this.connect(); } + private setSocketState(state: Socket) { + this.socket = state; + this._logVerbose( + `socket state changed: ${this.socket.state}, paused: ${ + "paused" in this.socket ? this.socket.paused : undefined + }`, + ); + } + private connect() { if (this.socket.state === "terminated") { return; @@ -173,11 +182,11 @@ export class WebSocketManager { const ws = new this.webSocketConstructor(this.uri); this._logVerbose("constructed WebSocket"); - this.socket = { + this.setSocketState({ state: "connecting", ws, paused: "no", - }; + }); // Kick off server inactivity timer before WebSocket connection is established // so we can detect cases where handshake fails. @@ -190,11 +199,11 @@ export class WebSocketManager { if (this.socket.state !== "connecting") { throw new Error("onopen called with socket not in connecting state"); } - this.socket = { + this.setSocketState({ state: "ready", ws, paused: this.socket.paused === "yes" ? "uninitialized" : "no", - }; + }); this.resetServerInactivityTimeout(); if (this.socket.paused === "no") { this.onOpen({ @@ -259,8 +268,14 @@ export class WebSocketManager { * @returns Whether the message (might have been) sent. */ sendMessage(message: ClientMessage) { - this._logVerbose(`sending message with type ${message.type}`); - + const messageForLog = { + type: message.type, + ...(message.type === "Authenticate" && message.tokenType === "User" + ? { + value: `...${message.value.slice(-7)}`, + } + : {}), + }; if (this.socket.state === "ready" && this.socket.paused === "no") { const encodedMessage = encodeClientMessage(message); const request = JSON.stringify(encodedMessage); @@ -273,8 +288,19 @@ export class WebSocketManager { this.closeAndReconnect("FailedToSendMessage"); } // We are not sure if this was sent or not. + this._logVerbose( + `sent message with type ${message.type}: ${JSON.stringify( + messageForLog, + )}`, + ); return true; } + this._logVerbose( + `message not sent (socket state: ${this.socket.state}, paused: ${"paused" in this.socket ? this.socket.paused : undefined}): ${JSON.stringify( + messageForLog, + )}`, + ); + return false; } @@ -388,7 +414,7 @@ export class WebSocketManager { case "connecting": case "ready": { const result = this.close(); - this.socket = { state: "terminated" }; + this.setSocketState({ state: "terminated" }); return result; } default: { @@ -431,12 +457,11 @@ export class WebSocketManager { case "stopped": break; case "terminated": - // If we're terminating we ignore restart - return; case "connecting": case "ready": case "disconnected": - throw new Error("`restart()` is only valid after `stop()`"); + this.logger.warn("Restart called without stopping first"); + return; default: { // Enforce that the switch-case is exhaustive. const _: never = this.socket; diff --git a/src/react/auth_websocket.test.tsx b/src/react/auth_websocket.test.tsx index b91973a..f7d2074 100644 --- a/src/react/auth_websocket.test.tsx +++ b/src/react/auth_websocket.test.tsx @@ -199,8 +199,14 @@ describe.sequential.skip("auth websocket tests", () => { // Do nothing }); - let token = jwtEncode({ iat: 1234500, exp: 1244500 }, "wobabloobla"); - const tokenFetcher = vi.fn(async () => token); + const tokens = [ + jwtEncode({ iat: 1234500, exp: 1244500 }, "secret1"), + jwtEncode({ iat: 1234500, exp: 1244500 }, "secret2"), + jwtEncode({ iat: 1234500, exp: 1244500 }, "secret3"), + jwtEncode({ iat: 1234500, exp: 1244500 }, "secret4"), + ]; + + const tokenFetcher = vi.fn(async () => tokens.shift()); const onAuthChange = vi.fn(); client.setAuth(tokenFetcher, onAuthChange); @@ -208,9 +214,6 @@ describe.sequential.skip("auth websocket tests", () => { expect((await receive()).type).toEqual("Authenticate"); expect((await receive()).type).toEqual("ModifyQuerySet"); - // Token must change, otherwise client will not try to reauthenticate - token = jwtEncode({ iat: 1234500, exp: 1244500 }, "secret"); - send({ type: "AuthError", error: "bla", @@ -218,7 +221,7 @@ describe.sequential.skip("auth websocket tests", () => { }); close(); - // The client reconnects automatically + // The client reconnects automatically and retries twice expect((await receive()).type).toEqual("Connect"); expect((await receive()).type).toEqual("Authenticate"); @@ -231,6 +234,26 @@ describe.sequential.skip("auth websocket tests", () => { }); close(); + expect((await receive()).type).toEqual("Connect"); + expect((await receive()).type).toEqual("Authenticate"); + expect((await receive()).type).toEqual("ModifyQuerySet"); + + send({ + type: "AuthError", + error: AUTH_ERROR_MESSAGE, + }); + close(); + + expect((await receive()).type).toEqual("Connect"); + expect((await receive()).type).toEqual("Authenticate"); + expect((await receive()).type).toEqual("ModifyQuerySet"); + + send({ + type: "AuthError", + error: AUTH_ERROR_MESSAGE, + }); + close(); + // The client reconnects automatically expect((await receive()).type).toEqual("Connect"); expect((await receive()).type).toEqual("ModifyQuerySet"); @@ -334,6 +357,300 @@ describe.sequential.skip("auth websocket tests", () => { }); }); + // This is a race condition where a delayed auth error from a non-auth message + // comes back while the client is waiting for server validation of the new token. + test("Client ignores non-auth responses for token validation", async () => { + await withInMemoryWebSocket(async ({ address, receive, send }) => { + const client = testReactClient(address); + const ts = Math.ceil(Date.now() / 1000); + const tokens = [ + jwtEncode({ iat: ts, exp: ts + 1000 }, "token1"), + jwtEncode({ iat: ts, exp: ts + 1000 }, "token2"), + ]; + const tokenFetcher = vi.fn(async (_opts) => tokens.shift()!); + const onChange = vi.fn(); + client.setAuth(tokenFetcher, onChange); + + expect((await receive()).type).toEqual("Connect"); + expect((await receive()).type).toEqual("Authenticate"); + expect((await receive()).type).toEqual("ModifyQuerySet"); + + const querySetVersion = client.sync["remoteQuerySet"]["version"]; + + send({ + type: "Transition", + startVersion: { + ...querySetVersion, + identity: 0, + }, + endVersion: { + ...querySetVersion, + identity: 1, + }, + modifications: [], + }); + + expect((await receive()).type).toEqual("Authenticate"); + + // This auth error text is specific to query/mutation/action related auth + // errors, which should be ignored for token validation. + send({ + type: "AuthError", + error: "Convex token identity expired", + baseVersion: 1, + }); + + send({ + type: "Transition", + startVersion: { + ...querySetVersion, + identity: 1, + }, + endVersion: { + ...querySetVersion, + identity: 2, + }, + modifications: [], + }); + + await new Promise((resolve) => setTimeout(resolve)); + await client.close(); + + expect(tokenFetcher).toHaveBeenCalledTimes(2); + expect(tokenFetcher).toHaveBeenNthCalledWith(1, { + forceRefreshToken: false, + }); + expect(tokenFetcher).toHaveBeenNthCalledWith(2, { + forceRefreshToken: true, + }); + expect(onChange).toHaveBeenCalledTimes(2); + expect(onChange).toHaveBeenNthCalledWith(1, true); + // Without proper handling, this second call will be false + expect(onChange).toHaveBeenNthCalledWith(2, true); + }); + }); + + // This is a race condition where a connection stopped by reauthentication + // never restarts due to reauthentication exiting early. This happens when + // an additional refetch begins while reauthentication is still running, + // such as with a scheduled refetch. + test("Client maintains connection when refetch occurs during reauth attempt", async () => { + await withInMemoryWebSocket(async ({ address, receive, send, close }) => { + vi.useFakeTimers(); + const client = testReactClient(address); + // Tokens have a 3 second expiration, scheduled refetch occurs 2 seconds + // prior to expiration (so 1 second after token validation completes). + const tokens = [ + (ts: number) => jwtEncode({ iat: ts, exp: ts + 3 }, "token1"), + (ts: number) => jwtEncode({ iat: ts, exp: ts + 3 }, "token2"), + (ts: number) => jwtEncode({ iat: ts, exp: ts + 3 }, "token3"), + (ts: number) => jwtEncode({ iat: ts, exp: ts + 3 }, "token4"), + ]; + const tokenFetcher = vi.fn(async (_opts) => { + // Simulate a one second delay in token fetching - long enough to + // cause a scheduled refetch to occur while reauth is still running. + vi.advanceTimersByTime(1000); + return tokens.shift()!(Math.ceil(Date.now() / 1000)); + }); + const onChange = vi.fn(); + client.setAuth(tokenFetcher, onChange); + + expect((await receive()).type).toEqual("Connect"); + expect((await receive()).type).toEqual("Authenticate"); + expect((await receive()).type).toEqual("ModifyQuerySet"); + + const querySetVersion = client.sync["remoteQuerySet"]["version"]; + + send({ + type: "Transition", + startVersion: { + ...querySetVersion, + identity: 0, + }, + endVersion: { + ...querySetVersion, + identity: 1, + }, + modifications: [], + }); + + expect((await receive()).type).toEqual("Authenticate"); + + send({ + type: "Transition", + startVersion: { + ...querySetVersion, + identity: 1, + }, + endVersion: { + ...querySetVersion, + identity: 2, + }, + modifications: [], + }); + + // A race condition where a user triggered query with a newly stale + // token triggers an auth error. + send({ + type: "AuthError", + error: "Convex token identity expired", + baseVersion: 2, + }); + close(); + + // The error has now caused reauthentication to begin. Reauth stops the + // connection. The scheduled refetch will have started during the + // 1000ms token fetcher delay, causing the reauth attempt to exit early + // and never restart the connection. + // + // This is the race condition this test covers. The scheduled refetch + // previously would fetch a new token but never restart the connection + // stopped by reauth. + expect((await receive()).type).toEqual("Connect"); + expect((await receive()).type).toEqual("Authenticate"); + expect((await receive()).type).toEqual("ModifyQuerySet"); + + // If the connection is successfully restarted, the client will receive + // the following Transition message and call onChange a second time with + // true for `isAuthenticated`. + send({ + type: "Transition", + startVersion: { + ...querySetVersion, + identity: 0, + }, + endVersion: { + ...querySetVersion, + identity: 1, + }, + modifications: [], + }); + + await client.close(); + + expect(tokenFetcher).toHaveBeenCalledTimes(4); + // Initial setConfig + expect(tokenFetcher).toHaveBeenNthCalledWith(1, { + forceRefreshToken: false, + }); + // Initial fresh token fetch + expect(tokenFetcher).toHaveBeenNthCalledWith(2, { + forceRefreshToken: true, + }); + // Reauth attempt + expect(tokenFetcher).toHaveBeenNthCalledWith(3, { + forceRefreshToken: true, + }); + // Scheduled refetch + expect(tokenFetcher).toHaveBeenNthCalledWith(4, { + forceRefreshToken: true, + }); + + // Confirm that auth state changed exactly twice, and was never + // set to false. + expect(onChange).toHaveBeenCalledTimes(2); + // Initial setConfig + expect(onChange).toHaveBeenNthCalledWith(1, true); + // Refetch after reauth + expect(onChange).toHaveBeenNthCalledWith(2, true); + vi.useRealTimers(); + }); + }); + + // When awaiting server confirmation of a fresh token, a subsequent + // auth error (from an Authenticate request) will cause the client to go to + // an unauthenticated state. This test covers a race condition where an + // Authenticate request for a fresh token is sent, and then the client app + // goes to background and misses the Transition response. If the client + // becomes active after the new token has expired, a new Authenticate request + // will be sent with the expired token, leading to an error response and + // unauthenticated state. + test("Client retries token validation on error", async () => { + await withInMemoryWebSocket(async ({ address, receive, send, close }) => { + const client = testReactClient(address); + const ts = Math.ceil(Date.now() / 1000); + const tokens = [ + jwtEncode({ iat: ts, exp: ts + 60 }, "token1"), + jwtEncode({ iat: ts, exp: ts + 60 }, "token2"), + jwtEncode({ iat: ts, exp: ts + 60 }, "token3"), + ]; + const tokenFetcher = vi.fn(async (_opts) => tokens.shift()!); + const onChange = vi.fn(); + client.setAuth(tokenFetcher, onChange); + + expect((await receive()).type).toEqual("Connect"); + expect((await receive()).type).toEqual("Authenticate"); + expect((await receive()).type).toEqual("ModifyQuerySet"); + + const querySetVersion = client.sync["remoteQuerySet"]["version"]; + + send({ + type: "Transition", + startVersion: { + ...querySetVersion, + identity: 0, + }, + endVersion: { + ...querySetVersion, + identity: 1, + }, + modifications: [], + }); + + expect((await receive()).type).toEqual("Authenticate"); + + // Simulating an auth error while waiting for server confirmation of a + // fresh token. + send({ + type: "AuthError", + error: "bla", + baseVersion: 1, + }); + close(); + + // The client should reattempt reauthentication. + expect((await receive()).type).toEqual("Connect"); + expect((await receive()).type).toEqual("Authenticate"); + expect((await receive()).type).toEqual("ModifyQuerySet"); + + send({ + type: "Transition", + startVersion: { + ...querySetVersion, + identity: 0, + }, + endVersion: { + ...querySetVersion, + identity: 1, + }, + modifications: [], + }); + + // Flush + await new Promise((resolve) => setTimeout(resolve)); + await client.close(); + + expect(tokenFetcher).toHaveBeenCalledTimes(3); + // Initial setConfig + expect(tokenFetcher).toHaveBeenNthCalledWith(1, { + forceRefreshToken: false, + }); + // Initial fresh token fetch + expect(tokenFetcher).toHaveBeenNthCalledWith(2, { + forceRefreshToken: true, + }); + // Reauth second attempt + expect(tokenFetcher).toHaveBeenNthCalledWith(3, { + forceRefreshToken: true, + }); + expect(onChange).toHaveBeenCalledTimes(2); + // Initial setConfig + expect(onChange).toHaveBeenNthCalledWith(1, true); + // Reauth second attempt + expect(onChange).toHaveBeenNthCalledWith(2, true); + }); + }); + test("Authentication runs first", async () => { await withInMemoryWebSocket(async ({ address, receive, send }) => { const client = testReactClient(address);