Skip to content
This repository has been archived by the owner on Apr 17, 2023. It is now read-only.

Commit

Permalink
fix: Add results processor abstraction
Browse files Browse the repository at this point in the history
  • Loading branch information
wtrocki committed May 17, 2019
1 parent c9821f2 commit 50e64bf
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 55 deletions.
32 changes: 32 additions & 0 deletions packages/sync/src/cache/IDProcessor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { IResultProcessor } from "../offline/procesors/IResultProcessor";
import { OperationQueueEntry } from "../offline/OperationQueueEntry";
import { FetchResult } from "apollo-link";
import { isClientGeneratedId } from "./createOptimisticResponse";

/**
* Allow updates on items created while offline.
* If item is created while offline and client generated ID is provided
* to optimisticResponse, later mutations on this item will be using this client
* generated ID. Once any create operation is successful, we should
* update entries in queue with ID returned from server.
*/
export class IDProcessor implements IResultProcessor {

public execute(queue: OperationQueueEntry[], entry: OperationQueueEntry, result: FetchResult) {
const { operation: { operationName }, optimisticResponse } = entry;
if (!result ||
!optimisticResponse ||
!optimisticResponse[operationName] ||
!isClientGeneratedId(optimisticResponse[operationName].id)) {
return;
}

const clientId = optimisticResponse && optimisticResponse[operationName].id;

queue.forEach(({ operation: op }) => {
if (op.variables.id === clientId) {
op.variables.id = result.data && result.data[operationName].id;
}
});
}
}
36 changes: 36 additions & 0 deletions packages/sync/src/conflicts/ConflictProcesor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { IResultProcessor } from "../offline/procesors/IResultProcessor";
import { OperationQueueEntry } from "../offline/OperationQueueEntry";
import { ObjectState } from "./ObjectState";
import { FetchResult } from "apollo-link";

/**
* Manipulate state of item that is being used for conflict resolution purposes.
* This is required for the queue items so that we do not get a conflict with ourself
* @param entry the operation which returns the result we compare with first queue entry
*/
export class ConflictProcessor implements IResultProcessor {
constructor(private state: ObjectState) {
}

public execute(queue: OperationQueueEntry[],
entry: OperationQueueEntry, result: FetchResult): void {
const { operation: { operationName } } = entry;
if (!result || !this.state) {
return;
}

if (result.data && result.data[operationName]) {
for (const { operation: op } of queue) {
if (op.variables.id === entry.operation.variables.id
&& op.operationName === entry.operation.operationName) {
const opVersion = this.state.currentState(op.variables);
const prevOpVersion = this.state.currentState(entry.operation.variables);
if (opVersion === prevOpVersion) {
op.variables = this.state.nextState(op.variables);
break;
}
}
}
}
}
}
11 changes: 9 additions & 2 deletions packages/sync/src/links/LinksBuilder.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { ApolloLink, Operation } from "apollo-link";
import { HttpLink } from "apollo-link-http";
import { RetryLink } from "apollo-link-retry";
import { conflictLink } from "../conflicts";
import { conflictLink, ObjectState } from "../conflicts";
import { DataSyncConfig } from "../config";
import { createAuthLink } from "./AuthLink";
import { AuditLoggingLink } from "./AuditLoggingLink";
Expand All @@ -12,6 +12,9 @@ import { isMutation, isOnlineOnly, isSubscription } from "../utils/helpers";
import { defaultWebSocketLink } from "./WebsocketLink";
import { OfflineLink } from "../offline/OfflineLink";
import { NetworkStatus, OfflineMutationsHandler, OfflineStore } from "../offline";
import { IDProcessor } from "../cache/IDProcessor";
import { ConflictProcessor } from "../conflicts/ConflictProcesor";
import { IResultProcessor } from "../offline/procesors/IResultProcessor";

/**
* Method for creating "uber" composite Apollo Link implementation including:
Expand All @@ -36,11 +39,15 @@ export const createDefaultLink = async (config: DataSyncConfig, offlineLink: Apo
* Create offline link
*/
export const createOfflineLink = async (config: DataSyncConfig, store: OfflineStore) => {
const resultProcessors: IResultProcessor[] = [
new IDProcessor(),
new ConflictProcessor(config.conflictStateProvider as ObjectState)
];
return new OfflineLink({
store,
listener: config.offlineQueueListener,
networkStatus: config.networkStatus as NetworkStatus,
conflictStateProvider: config.conflictStateProvider
resultProcessors
});
};

Expand Down
3 changes: 2 additions & 1 deletion packages/sync/src/offline/OfflineLink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import { ObjectState } from "../conflicts";
import * as debug from "debug";
import { QUEUE_LOGGER } from "../config/Constants";
import { OfflineError } from "./OfflineError";
import { IResultProcessor } from "./procesors/IResultProcessor";

export const logger = debug.default(QUEUE_LOGGER);

export interface OfflineLinkOptions {
networkStatus: NetworkStatus;
store: OfflineStore;
listener?: OfflineQueueListener;
conflictStateProvider?: ObjectState;
resultProcessors?: IResultProcessor[];
}

/**
Expand Down
65 changes: 13 additions & 52 deletions packages/sync/src/offline/OfflineQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { ObjectState } from "../conflicts/ObjectState";
import { Operation, NextLink, Observable, FetchResult } from "apollo-link";
import { OfflineStore } from "./storage/OfflineStore";
import { OfflineLinkOptions } from "../links";
import { IResultProcessor } from "./procesors/IResultProcessor";

export type OperationQueueChangeHandler = (entry: OperationQueueEntry) => void;

Expand All @@ -22,11 +23,12 @@ export class OfflineQueue {
private readonly listener?: OfflineQueueListener;
private readonly state?: ObjectState;
private store: OfflineStore;
private resultProcessors: IResultProcessor[] | undefined;

constructor(options: OfflineLinkOptions) {
this.store = options.store;
this.listener = options.listener;
this.state = options.conflictStateProvider;
this.resultProcessors = options.resultProcessors;
}

/**
Expand Down Expand Up @@ -84,6 +86,15 @@ export class OfflineQueue {
}
}

public executeResultProcessors(op: OperationQueueEntry,
result: FetchResult<any>) {
if (this.resultProcessors) {
for (const resultProcesor of this.resultProcessors) {
resultProcesor.execute(this.queue, op, result);
}
}
}

private onForwardError(op: OperationQueueEntry, error: any) {
if (this.listener && this.listener.onOperationFailure) {
this.listener.onOperationFailure(op.operation, undefined, op.networkError);
Expand All @@ -105,8 +116,7 @@ export class OfflineQueue {
if (this.listener && this.listener.onOperationSuccess) {
this.listener.onOperationSuccess(op.operation, result.data);
}
this.updateIds(op, result);
this.updateObjectState(op, result);
this.executeResultProcessors(op, result);
}
if (entry) {
this.store.removeEntry(entry);
Expand All @@ -119,53 +129,4 @@ export class OfflineQueue {
}
}

/**
* Allow updates on items created while offline.
* If item is created while offline and client generated ID is provided
* to optimisticResponse, later mutations on this item will be using this client
* generated ID. Once any create operation is successful, we should
* update entries in queue with ID returned from server.
*/
private updateIds(entry: OperationQueueEntry, result: FetchResult) {
const { operation: { operationName }, optimisticResponse } = entry;
if (!result ||
!optimisticResponse ||
!optimisticResponse[operationName] ||
!isClientGeneratedId(optimisticResponse[operationName].id)) {
return;
}

const clientId = optimisticResponse && optimisticResponse[operationName].id;

this.queue.forEach(({ operation: op }) => {
if (op.variables.id === clientId) {
op.variables.id = result.data && result.data[operationName].id;
}
});
}

/**
* Manipulate state of item that is being used for conflict resolution purposes.
* This is required for the queue items so that we do not get a conflict with ourself
* @param entry the operation which returns the result we compare with first queue entry
*/
private updateObjectState(entry: OperationQueueEntry, result: FetchResult) {
const { operation: { operationName } } = entry;
if (!result || !this.state) {
return;
}

if (result.data && result.data[operationName]) {
for (const { operation: op } of this.queue) {
if (op.variables.id === entry.operation.variables.id && op.operationName === entry.operation.operationName) {
const opVersion = this.state.currentState(op.variables);
const prevOpVersion = this.state.currentState(entry.operation.variables);
if (opVersion === prevOpVersion) {
op.variables = this.state.nextState(op.variables);
break;
}
}
}
}
}
}
21 changes: 21 additions & 0 deletions packages/sync/src/offline/procesors/IResultProcessor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { FetchResult } from "apollo-link";
import { OperationQueueEntry } from "../OperationQueueEntry";
import { isClientGeneratedId } from "../..";

/**
* Interface that can be used to perform operation on result data for offline queue.
*
* @see IDProcessor
* @see ConflictProcessor
*/
export interface IResultProcessor {

/**
* Process operation and queue
*
* @param queue
* @param op
* @param result
*/
execute(queue: OperationQueueEntry[], op: OperationQueueEntry, result: FetchResult<any>): void;
}

0 comments on commit 50e64bf

Please sign in to comment.