Skip to content

Commit

Permalink
fix: apply a fix to resolve memory leak from subscription promise lea…
Browse files Browse the repository at this point in the history
…kage (#3241)

Co-authored-by: Vinh <[email protected]>
  • Loading branch information
wyattjoh and cvle authored Oct 28, 2020
1 parent 3743f89 commit 45868ae
Showing 1 changed file with 58 additions and 27 deletions.
85 changes: 58 additions & 27 deletions src/core/server/graph/resolvers/Subscription/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
import { GraphQLResolveInfo } from "graphql";
import { withFilter } from "graphql-subscriptions";

import GraphContext from "../../context";
import { SUBSCRIPTION_CHANNELS, SubscriptionPayload } from "./types";

type ResolverFn<TParent, TArgs, TContext> = (
parent: TParent,
args: TArgs,
context: TContext,
info: GraphQLResolveInfo
) => AsyncIterable<any>;

type FilterFn<TParent, TArgs, TContext> = (
parent: TParent,
args: TArgs,
Expand All @@ -23,13 +29,51 @@ interface SubscriptionResolver<TParent, TArgs, TResult> {
resolve: Resolver<TParent, TArgs, TParent>;
}

export function createTenantAsyncIterator<TParent, TArgs, TResult>(
/**
* withFilter applies a filter to a async iterator.
*
* This duplicates the functionality of the withFilter function provided by the
* `graphql-subscriptions` package without the memory leak as it uses native
* async iterators instead.
*
* Solution provided by @brettjashford.
*
* https://github.com/apollographql/graphql-subscriptions/pull/209#issuecomment-713906710
*
* @param asyncIteratorFn the async iterator to use that's provided by the transport
* @param filterFn the filter to apply for each iteration to check to see if we should sent it
*/
function withFilter<TParent, TArgs>(
asyncIteratorFn: ResolverFn<TParent, TArgs, GraphContext>,
filterFn: FilterFn<TParent, TArgs, GraphContext>
) {
return async function* (
source: TParent,
args: TArgs,
ctx: GraphContext,
info: GraphQLResolveInfo
) {
const asyncIterator = asyncIteratorFn(source, args, ctx, info);
for await (const payload of asyncIterator) {
if (await filterFn(payload, args, ctx, info)) {
yield payload;
}
}
};
}

function createTenantAsyncIterator<TParent, TArgs, TResult>(
channel: SUBSCRIPTION_CHANNELS
): Resolver<TParent, TArgs, AsyncIterator<TResult>> {
): Resolver<TParent, TArgs, AsyncIterable<TResult>> {
return (source, args, ctx) =>
ctx.pubsub.asyncIterator<TResult>(
// This is already technically returning an AsyncIterable, the Typescript
// types are in fact wrong:
//
// https://github.com/davidyaha/graphql-redis-subscriptions/pull/255
//
(ctx.pubsub.asyncIterator<TResult>(
createSubscriptionChannelName(ctx.tenant.id, channel)
);
) as unknown) as AsyncIterable<TResult>;
}

export function createSubscriptionChannelName(
Expand All @@ -40,7 +84,7 @@ export function createSubscriptionChannelName(
}

/**
* defaultFilterFn will perform filtering operations on the subscription
* clientIDFilterFn will perform filtering operations on the subscription
* responses to ensure that mutations issued by one user is not sent back as a
* subscription to the same requesting User, as they already implement the
* update via the mutation response.
Expand All @@ -53,7 +97,7 @@ export function createSubscriptionChannelName(
* need to determine eligibility to send the subscription back or
* not.
*/
export function defaultFilterFn<TParent extends SubscriptionPayload, TArgs>(
export function clientIDFilterFn<TParent extends SubscriptionPayload, TArgs>(
source: TParent,
args: TArgs,
ctx: GraphContext
Expand All @@ -65,28 +109,15 @@ export function defaultFilterFn<TParent extends SubscriptionPayload, TArgs>(
return true;
}

/**
* Ensure that even when we're provided with a domain specific filtering
* function we respect the subscription id that is sent back with the request to
* prevent double responses.
*/
export function createFilterFn<TParent, TArgs>(
filter?: FilterFn<TParent, TArgs, GraphContext>
function composeFilters<TParent, TArgs>(
...filters: Array<FilterFn<TParent, TArgs, GraphContext>>
): FilterFn<TParent, TArgs, GraphContext> {
return filter
? // Combine the filters, preferring the defaultFilterFn first.
(source, args, ctx, info) => {
if (!defaultFilterFn(source, args, ctx)) {
return false;
}

return filter(source, args, ctx, info);
}
: defaultFilterFn;
return (source, args, ctx, info) =>
filters.every((filter) => filter(source, args, ctx, info));
}

export interface CreateIteratorInput<TParent, TArgs, TResult> {
filter?: FilterFn<TParent, TArgs, GraphContext>;
filter: FilterFn<TParent, TArgs, GraphContext>;
}

export function createIterator<
Expand All @@ -95,12 +126,12 @@ export function createIterator<
TResult
>(
channel: SUBSCRIPTION_CHANNELS,
{ filter }: CreateIteratorInput<TParent, TArgs, TResult> = {}
{ filter }: CreateIteratorInput<TParent, TArgs, TResult>
): SubscriptionResolver<TParent, TArgs, TResult> {
return {
subscribe: withFilter(
createTenantAsyncIterator(channel),
createFilterFn(filter)
composeFilters(clientIDFilterFn, filter)
),
resolve: (payload) => payload,
};
Expand Down

0 comments on commit 45868ae

Please sign in to comment.