Skip to content

Commit

Permalink
Enable Record Sync 🚀
Browse files Browse the repository at this point in the history
Enable record sync for query history and load history in the core app and in the web extension

Add socket server to allow sync events to reach all devices and be shared in real-time with the chrome extension

Add chrome extension information to the home page
  • Loading branch information
paustint committed Feb 3, 2025
1 parent 71cd3ea commit 34349bd
Show file tree
Hide file tree
Showing 103 changed files with 3,430 additions and 900 deletions.
4 changes: 3 additions & 1 deletion Dockerfile.e2e
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ COPY ./prisma ./prisma/
RUN yarn

# Install other dependencies that were not calculated by nx, but are required
RUN yarn add dotenv prisma@^3.13.0
# Install matching version of prisma by extracting @prisma/client version from package.json,
# and stripping the caret ("^") if present.
RUN yarn add prisma@$(node -p "require('./package.json')['dependencies']['@prisma/client'].replace('^','')")

# Generate prisma client - ensure that there are no OS differences
RUN npx prisma generate
Expand Down
91 changes: 91 additions & 0 deletions apps/api/src/app/controllers/data-sync.controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import { ensureBoolean, REGEX } from '@jetstream/shared/utils';
import { SyncRecordOperationSchema } from '@jetstream/types';
import { parseISO } from 'date-fns';
import { clamp } from 'lodash';
import { z } from 'zod';
import * as userSyncDbService from '../db/data-sync.db';
import { emitEventsToOtherClients, SyncEvent } from '../services/data-sync-broadcast.service';
import { sendJson } from '../utils/response.handlers';
import { createRoute } from '../utils/route.utils';

export const routeDefinition = {
pull: {
controllerFn: () => pull,
validators: {
query: z.object({
updatedAt: z
.string()
.regex(REGEX.ISO_DATE)
.nullish()
.transform((val) => (val ? parseISO(val) : null)),
limit: z.coerce
.number()
.int()
.optional()
.default(userSyncDbService.MAX_PULL)
.transform((val) => clamp(val, userSyncDbService.MIN_PULL, userSyncDbService.MAX_PULL)),
/**
* Used for pagination, if there are more records, this is the last key of the previous page
*/
lastKey: z.string().nullish(),
}),
hasSourceOrg: false,
},
},
push: {
controllerFn: () => push,
validators: {
query: z.object({
clientId: z.string().uuid(),
updatedAt: z
.string()
.regex(REGEX.ISO_DATE)
.nullish()
.transform((val) => (val ? parseISO(val) : null)),
includeAllIfUpdatedAtNull: z
.union([z.enum(['true', 'false']), z.boolean()])
.optional()
.default(false)
.transform(ensureBoolean),
}),
body: SyncRecordOperationSchema.array().max(userSyncDbService.MAX_SYNC),
hasSourceOrg: false,
},
},
};

/**
* Pull changes from server
*/
const pull = createRoute(routeDefinition.pull.validators, async ({ user, query }, req, res) => {
const { lastKey, updatedAt, limit } = query;
const response = await userSyncDbService.findByUpdatedAt({
userId: user.id,
lastKey,
updatedAt,
limit,
});
sendJson(res, response);
});

/**
* Push changes to server and emit to any other clients the user has active
*/
const push = createRoute(routeDefinition.push.validators, async ({ user, body: records, query }, req, res) => {
const response = await userSyncDbService.syncRecordChanges({
updatedAt: query.updatedAt,
userId: user.id,
records,
includeAllIfUpdatedAtNull: query.includeAllIfUpdatedAtNull,
});

const syncEvent: SyncEvent = {
clientId: query.clientId,
data: { keys: response.records.map(({ key }) => key) },
userId: user.id,
};

emitEventsToOtherClients(req.session.id, syncEvent);

sendJson(res, response);
});
203 changes: 124 additions & 79 deletions apps/api/src/app/controllers/socket.controller.ts
Original file line number Diff line number Diff line change
@@ -1,122 +1,167 @@
import { getExceptionLog, logger } from '@jetstream/api-config';
import { ENV, getExceptionLog, logger } from '@jetstream/api-config';
import { convertUserProfileToSession } from '@jetstream/auth/server';
import { UserProfileSession } from '@jetstream/auth/types';
import * as cometdClient from 'cometd-nodejs-client';
import { HTTP } from '@jetstream/shared/constants';
import { isChromeExtension } from '@jetstream/shared/ui-utils';
import { SocketEvent } from '@jetstream/types';
import { createAdapter } from '@socket.io/cluster-adapter';
import * as express from 'express';
import { IncomingMessage, createServer } from 'http';
import { nanoid } from 'nanoid';
import { Server, Socket } from 'socket.io';
import { ExtendedError } from 'socket.io/dist/namespace';
import cluster from 'node:cluster';
import { Server } from 'socket.io';
import { DefaultEventsMap } from 'socket.io/dist/typed-events';
import { environment } from '../../environments/environment';
import * as socketUtils from '../utils/socket-utils';

cometdClient.adapt();
import * as webExtensionService from '../services/auth-web-extension.service';
import { Request, Response } from '../types/types';

let io: Server<DefaultEventsMap, DefaultEventsMap, DefaultEventsMap>;

const wrapMiddleware =
(middleware) => (socket: Socket<DefaultEventsMap, DefaultEventsMap, DefaultEventsMap>, next: (err?: ExtendedError) => void) =>
middleware(socket.request, {}, next);

function getUser(socket: Socket<DefaultEventsMap, DefaultEventsMap, DefaultEventsMap>) {
function getUser(request: IncomingMessage) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const user = (socket.request as any)?.user as UserProfileSession;
const user = (request as any)?.session?.user as UserProfileSession | undefined;
return user;
}

async function getWebExtensionUser(request: IncomingMessage) {
if (request.headers.origin !== `chrome-extension://${ENV.WEB_EXTENSION_ID}`) {
return;
}
const authorizationHeader = request.headers[HTTP.HEADERS.AUTHORIZATION.toLowerCase()] as string;
const deviceId = request.headers[HTTP.HEADERS.X_WEB_EXTENSION_DEVICE_ID.toLowerCase()] as string;
if (!authorizationHeader || !deviceId || !authorizationHeader.startsWith('Bearer ')) {
return;
}
const accessToken = authorizationHeader.split(' ')[1];
const user = await webExtensionService
.verifyToken({ token: accessToken, deviceId })
.then((decodedJwt) => convertUserProfileToSession(decodedJwt.userProfile));

(request as any).session = { ...(request as any).session, user, deviceId };

return user;
}

// eslint-disable-next-line @typescript-eslint/no-unused-vars
function isValidRequest(req: IncomingMessage) {
// req.// TODO: make sure origin matches
// const noOriginHeader = req.headers.origin === undefined;
// could make sure origin matches
// potentially do auth check if possible? (may not be possible)
return true;
export function emitSocketEvent({
userId,
event,
exceptRooms,
payload,
}: {
userId: string;
event: SocketEvent;
exceptRooms?: string[];
payload?: unknown;
}) {
try {
let broadcastOperator = io.to(userId);
if (exceptRooms) {
broadcastOperator = broadcastOperator.except(exceptRooms);
}
broadcastOperator.emit(event, payload);
} catch (ex) {
logger.error({ ...getExceptionLog(ex), userId, event }, 'Error emitting socket event');
}
}

export function initSocketServer(app: express.Express, middlewareFns: express.RequestHandler[]) {
export function initSocketServer(
app: express.Express,
middlewareFns: {
sessionMiddleware: express.RequestHandler;
}
) {
const httpServer = createServer(app);

io = new Server(httpServer, {
serveClient: false,
cookie: {
httpOnly: false,
secure: environment.production,
sameSite: 'strict',
// eslint-disable-next-line @typescript-eslint/no-explicit-any
} as any,
allowRequest: (req, callback) => {
const isOriginValid = isValidRequest(req);
callback(null, isOriginValid);
cors: {
origin: [`chrome-extension://${ENV.WEB_EXTENSION_ID}`],
methods: ['GET', 'POST'],
allowedHeaders: [HTTP.HEADERS.AUTHORIZATION, HTTP.HEADERS.X_WEB_EXTENSION_DEVICE_ID],
credentials: true,
},
cookie: isChromeExtension()
? undefined
: {
name: 'socketSid',
httpOnly: false,
secure: environment.production,
sameSite: 'strict',
},
allowRequest: async (req, callback) => {
try {
// normal session
let user = getUser(req);
if (user) {
callback(null, true);
return;
}
// web-extension session
user = await getWebExtensionUser(req);
if (user) {
callback(null, true);
return;
}
// Unauthorized
logger.warn('[SOCKET][ERROR] unauthorized');
callback('Unauthorized', false);
return;
} catch (ex) {
logger.warn('[SOCKET][ERROR] error authorizing', ex);
callback('Unauthorized', false);
return;
}
},
});

// eslint-disable-next-line @typescript-eslint/no-unused-vars
io.engine.generateId = (...args: unknown[]) => {
return nanoid(); // must be unique across all Socket.IO servers
};

// can we do anything here?
// io.engine.on("initial_headers", (headers, req) => {
// headers["test"] = "123";
// headers["set-cookie"] = "mycookie=456";
// });

// can we do anything here?
// io.engine.on("headers", (headers, req) => {
// headers["test"] = "789";
// });

// Possible app structures
// https://socket.io/docs/v4/server-application-structure/
if (cluster.isWorker) {
io.adapter(createAdapter());
}

middlewareFns.forEach((middleware) => io.use(wrapMiddleware(middleware)));

io.use((socket, next) => {
const user = getUser(socket);
if (user) {
io.engine.use((req: Request, res: Response, next: express.NextFunction) => {
// the chrome extension does not include cookies and hangs on this middleware
if (req.headers.origin === `chrome-extension://${ENV.WEB_EXTENSION_ID}`) {
next();
} else {
logger.debug('[SOCKET][ERROR] unauthorized');
next(new Error('unauthorized'));
middlewareFns.sessionMiddleware(req, res, next);
}
});

io.on('connection', (socket) => {
const user = getUser(socket);
const userSocketState: socketUtils.SocketConnectionState = {
io,
user,
socket,
cometdConnections: {},
};
logger.debug({ socketId: socket.id, userId: user?.id || 'unknown' }, '[SOCKET][CONNECT] %s', socket.id);
if (user) {
socket.join(user.id);
const session = (socket.request as any)?.session;
const sessionId = session?.id as string | undefined;
const userId = session?.user?.id as string | undefined;
const deviceId = session?.deviceId as string | undefined;

logger.trace(
{ socketId: socket.id, userId: session?.user?.id || 'unknown', sessionId: session?.id },
'[SOCKET][CONNECT] %s',
socket.id
);

if (userId) {
socket.join(userId);
}

if (sessionId) {
socket.join(sessionId);
}

if (deviceId) {
socket.join(deviceId);
}

// server namespace disconnect, client namespace disconnect, server shutting down, ping timeout, transport close, transport error
socket.on('disconnect', (reason) => {
logger.debug({ socketId: socket.id, userId: user?.id || 'unknown' }, '[SOCKET][DISCONNECT] %s', reason);
// TODO: should we distinguish specific reason for disconnect before unsubscribing from cometd?
// If browser did not really disconnect, how will it know that it is no longer subscribed to cometd?
Object.values(userSocketState.cometdConnections).forEach(({ cometd, subscriptions }) => {
if (cometd) {
socketUtils.disconnectCometD(cometd, socket, user);
}
subscriptions.clear();
});
userSocketState.cometdConnections = {};
logger.trace({ socketId: socket.id, userId: userId || 'unknown' }, '[SOCKET][DISCONNECT] %s', reason);
});

socket.on('error', (err) => {
logger.warn({ socketId: socket.id, userId: user?.id || 'unknown', ...getExceptionLog(err) }, '[SOCKET][ERROR] %s', err.message);
logger.error({ socketId: socket.id, userId: userId || 'unknown', ...getExceptionLog(err) }, '[SOCKET][ERROR] %s', err.message);
});

/**
* TODO: add socket handlers here - these were removed since not actually needed but are good reference
*/
// socket.on(SOCKET_EVENTS.PLATFORM_EVENT_SUBSCRIBE, platformEvService.subscribeToPlatformEvent(userSocketState));
// socket.on(SOCKET_EVENTS.PLATFORM_EVENT_UNSUBSCRIBE, platformEvService.unsubscribeFromPlatformEvent(userSocketState));
});

return httpServer;
Expand Down
Loading

0 comments on commit 34349bd

Please sign in to comment.