Skip to content

Commit

Permalink
Clean up codes
Browse files Browse the repository at this point in the history
  • Loading branch information
hackerwins committed Nov 7, 2024
1 parent 4e6f5b4 commit 5402e93
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 82 deletions.
176 changes: 100 additions & 76 deletions packages/sdk/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,11 @@ export interface ClientOptions {
apiKey?: string;

/**
* `authTokenInjector` is a function to provide a token that will be passed to
* the auth webhook. If the token becomes invalid or expires, this function
* will be called again with the authError parameter, allowing for token refresh.
* `authError` is optional parameter containing error information if the previous
* token was invalid or expired.
* `authTokenInjector` is a function that provides a token for the auth webhook.
* When the webhook response status code is 401, this function is called to refresh the token.
* The `reason` parameter is the reason from the webhook response.
*/
authTokenInjector?: (authErrorMessage?: string) => Promise<string>;
authTokenInjector?: (reason?: string) => Promise<string>;

/**
* `syncLoopDuration` is the duration of the sync loop. After each sync loop,
Expand All @@ -158,6 +156,25 @@ export interface ClientOptions {
*/
reconnectStreamDelay?: number;

// retry 로직은 retryable 인 status code에 대해서 재시도 함
// webhook 서버에서 잘못된 토큰을 발행하면 token refresh를 무한 반복하는 문제가 발생 됨
// 이 무한 반복되는 문제를 해결하기 위해서 최대 제한 수를 제한함

// Q) retry 로직은 어디에 적용해야하는가?
// - client.activate 같은 함수에서 토큰 에러가 날 때 retry하고 있음...
// error retryable? = client.activate(); // 에러 발생
// client.attach(doc);
// - client.sync(doc);
// background routine에 의한 싱크는 이벤트 핸들러로 알려줘야함

// ```
// try {
// const doc = await client.attach(doc); // token refresh
// } catch(e) {
// await client.attach(doc);
// }
// ```

/**
* `retryRequestDelay` defines the waiting time between retry attempts.
* The default value is `1000`(ms).
Expand Down Expand Up @@ -208,17 +225,17 @@ export class Client {
private attachmentMap: Map<DocumentKey, Attachment<unknown, any>>;

private apiKey: string;
private authTokenInjector?: (authErrorMessage?: string) => Promise<string>;
private setAuthToken?: (token: string) => void;
private authTokenInjector?: (reason?: string) => Promise<string>;
private conditions: Record<ClientCondition, boolean>;
private syncLoopDuration: number;
private reconnectStreamDelay: number;
private retrySyncLoopDelay: number;
private retryRequestDelay: number;
private maxRequestRetries: number;

private rpcClient: PromiseClient<typeof YorkieService>;
private rpcAddr: string;
private rpcClient?: PromiseClient<typeof YorkieService>;
private setAuthToken: (token: string) => void;
private taskQueue: Array<() => Promise<any>>;
private processing = false;

Expand Down Expand Up @@ -252,6 +269,18 @@ export class Client {
opts.maxRequestRetries ?? DefaultClientOptions.maxRequestRetries;
this.rpcAddr = rpcAddr;

const { authInterceptor, setToken } = createAuthInterceptor(this.apiKey);
this.setAuthToken = setToken;

// Here we make the client itself, combining the service
// definition with the transport.
this.rpcClient = createPromiseClient(
YorkieService,
createGrpcWebTransport({
baseUrl: this.rpcAddr,
interceptors: [authInterceptor, createMetricInterceptor()],
}),
);
this.taskQueue = [];
}

Expand All @@ -265,29 +294,18 @@ export class Client {
return Promise.resolve();
}

// Here we make the client itself, combining the service
// definition with the transport.
const token = this.authTokenInjector && (await this.authTokenInjector());
const { authInterceptor, setToken } = createAuthInterceptor(
this.apiKey,
token,
);
this.setAuthToken = setToken;

this.rpcClient = createPromiseClient(
YorkieService,
createGrpcWebTransport({
baseUrl: this.rpcAddr,
interceptors: [authInterceptor, createMetricInterceptor()],
}),
);
if (this.authTokenInjector) {
const token = await this.authTokenInjector();
this.setAuthToken(token);
}

return this.enqueueTask(async () => {
const requestActivateClient = async (retryCount = 0): Promise<void> => {
return this.rpcClient!.activateClient(
{ clientKey: this.key },
{ headers: { 'x-shard-key': this.apiKey } },
)
return this.rpcClient
.activateClient(
{ clientKey: this.key },
{ headers: { 'x-shard-key': this.apiKey } },
)
.then((res) => {
this.id = res.clientId;
this.status = ClientStatus.Activated;
Expand Down Expand Up @@ -327,10 +345,11 @@ export class Client {

return this.enqueueTask(async () => {
const requestDeactivateClient = async (retryCount = 0): Promise<void> => {
return this.rpcClient!.deactivateClient(
{ clientId: this.id! },
{ headers: { 'x-shard-key': this.apiKey } },
)
return this.rpcClient
.deactivateClient(
{ clientId: this.id! },
{ headers: { 'x-shard-key': this.apiKey } },
)
.then(() => {
this.deactivateInternal();
logger.info(`[DC] c"${this.getKey()}" deactivated`);
Expand Down Expand Up @@ -406,13 +425,14 @@ export class Client {
const requestAttachDocument = async (
retryCount = 0,
): Promise<Document<T, P>> => {
return this.rpcClient!.attachDocument(
{
clientId: this.id!,
changePack: converter.toChangePack(doc.createChangePack()),
},
{ headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` } },
)
return this.rpcClient
.attachDocument(
{
clientId: this.id!,
changePack: converter.toChangePack(doc.createChangePack()),
},
{ headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` } },
)
.then(async (res) => {
const pack = converter.fromChangePack<P>(res.changePack!);
doc.applyChangePack(pack);
Expand Down Expand Up @@ -525,15 +545,16 @@ export class Client {
const requestDetachDocument = async (
retryCount = 0,
): Promise<Document<T, P>> => {
return this.rpcClient!.detachDocument(
{
clientId: this.id!,
documentId: attachment.docID,
changePack: converter.toChangePack(doc.createChangePack()),
removeIfNotAttached: options.removeIfNotAttached ?? false,
},
{ headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` } },
)
return this.rpcClient
.detachDocument(
{
clientId: this.id!,
documentId: attachment.docID,
changePack: converter.toChangePack(doc.createChangePack()),
removeIfNotAttached: options.removeIfNotAttached ?? false,
},
{ headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` } },
)
.then((res) => {
const pack = converter.fromChangePack<P>(res.changePack!);
doc.applyChangePack(pack);
Expand Down Expand Up @@ -703,14 +724,15 @@ export class Client {
pbChangePack.isRemoved = true;

return this.enqueueTask(async () => {
return this.rpcClient!.removeDocument(
{
clientId: this.id!,
documentId: attachment.docID,
changePack: pbChangePack,
},
{ headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` } },
)
return this.rpcClient
.removeDocument(
{
clientId: this.id!,
documentId: attachment.docID,
changePack: pbChangePack,
},
{ headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` } },
)
.then((res) => {
const pack = converter.fromChangePack<P>(res.changePack!);
doc.applyChangePack(pack);
Expand Down Expand Up @@ -807,15 +829,16 @@ export class Client {

const doLoop = async (): Promise<any> => {
return this.enqueueTask(async () => {
return this.rpcClient!.broadcast(
{
clientId: this.id!,
documentId: attachment.docID,
topic,
payload: new TextEncoder().encode(JSON.stringify(payload)),
},
{ headers: { 'x-shard-key': `${this.apiKey}/${docKey}` } },
)
return this.rpcClient
.broadcast(
{
clientId: this.id!,
documentId: attachment.docID,
topic,
payload: new TextEncoder().encode(JSON.stringify(payload)),
},
{ headers: { 'x-shard-key': `${this.apiKey}/${docKey}` } },
)
.then(() => {
logger.info(
`[BC] c:"${this.getKey()}" broadcasts d:"${docKey}" t:"${topic}"`,
Expand Down Expand Up @@ -915,7 +938,7 @@ export class Client {
}

const ac = new AbortController();
const stream = this.rpcClient!.watchDocument(
const stream = this.rpcClient.watchDocument(
{
clientId: this.id!,
documentId: attachment.docID,
Expand Down Expand Up @@ -1052,15 +1075,16 @@ export class Client {
const requestPushPullChanges = async (
retryCount = 0,
): Promise<Document<T, P>> => {
return this.rpcClient!.pushPullChanges(
{
clientId: this.id!,
documentId: docID,
changePack: converter.toChangePack(reqPack),
pushOnly: syncMode === SyncMode.RealtimePushOnly,
},
{ headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` } },
)
return this.rpcClient
.pushPullChanges(
{
clientId: this.id!,
documentId: docID,
changePack: converter.toChangePack(reqPack),
pushOnly: syncMode === SyncMode.RealtimePushOnly,
},
{ headers: { 'x-shard-key': `${this.apiKey}/${doc.getKey()}` } },
)
.then((res) => {
const respPack = converter.fromChangePack<P>(res.changePack!);

Expand Down
12 changes: 6 additions & 6 deletions packages/sdk/test/integration/webhook_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ describe('Auth Webhook', () => {
task,
}) => {
const TokenExpirationMs = 500;
const authTokenInjector = vi.fn(async (authErrorMessage) => {
if (authErrorMessage === ExpiredTokenErrorMessage) {
const authTokenInjector = vi.fn(async (reason) => {
if (reason === ExpiredTokenErrorMessage) {
return `token-${Date.now() + TokenExpirationMs}`;
}
return `token-${Date.now() - TokenExpirationMs}`; // token expired
Expand Down Expand Up @@ -310,8 +310,8 @@ describe('Auth Webhook', () => {
);

const TokenExpirationMs = 500;
const authTokenInjector = vi.fn(async (authErrorMessage) => {
if (authErrorMessage === ExpiredTokenErrorMessage) {
const authTokenInjector = vi.fn(async (reason) => {
if (reason === ExpiredTokenErrorMessage) {
return `token-${Date.now() + TokenExpirationMs}`;
}
return `token-${Date.now()}`;
Expand Down Expand Up @@ -388,8 +388,8 @@ describe('Auth Webhook', () => {
);

const TokenExpirationMs = 2000;
const authTokenInjector = vi.fn(async (authErrorMessage) => {
if (authErrorMessage === ExpiredTokenErrorMessage) {
const authTokenInjector = vi.fn(async (reason) => {
if (reason === ExpiredTokenErrorMessage) {
return `token-${Date.now() + TokenExpirationMs}`;
}
return `token-${Date.now()}`;
Expand Down

0 comments on commit 5402e93

Please sign in to comment.