Skip to content

feat: Automatically retry failed webhook deliveries #745

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .eslintrc.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions .gitattributes

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions .gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions .projen/files.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions .projen/tasks.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions package.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 49 additions & 4 deletions src/lambda-github.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { createHash } from 'crypto';
import { createAppAuth } from '@octokit/auth-app';
import { Octokit } from '@octokit/rest';
import { getSecretValue, getSecretJsonValue } from './lambda-helpers';
import { getSecretJsonValue, getSecretValue } from './lambda-helpers';

export function baseUrlFromDomain(domain: string): string {
if (domain == 'github.com') {
Expand Down Expand Up @@ -36,18 +36,23 @@ export async function getOctokit(installationId?: number): Promise<{ octokit: Oc
try {
// Test if the cached octokit is still valid
await cached.rest.meta.getOctocat();
console.log('Using cached octokit');
console.log({
notice: 'Using cached octokit',
});
return {
octokit: cached,
githubSecrets,
};
} catch (e) {
console.log('Octokit cache is invalid', e);
console.log({
notice: 'Octokit cache is invalid',
error: e,
});
octokitCache.delete(cacheKey);
}
}

let baseUrl = baseUrlFromDomain(githubSecrets.domain);
const baseUrl = baseUrlFromDomain(githubSecrets.domain);

let token;
if (githubSecrets.personalAuthToken) {
Expand Down Expand Up @@ -84,6 +89,32 @@ export async function getOctokit(installationId?: number): Promise<{ octokit: Oc
};
}

// This function is used to get the Octokit instance for the app itself, not for a specific installation.
// With PAT authentication, it returns undefined.
export async function getAppOctokit() {
if (!process.env.GITHUB_SECRET_ARN || !process.env.GITHUB_PRIVATE_KEY_SECRET_ARN) {
throw new Error('Missing environment variables');
}

const githubSecrets: GitHubSecrets = await getSecretJsonValue(process.env.GITHUB_SECRET_ARN);
const baseUrl = baseUrlFromDomain(githubSecrets.domain);

if (githubSecrets.personalAuthToken || !githubSecrets.appId) {
return undefined;
}

const privateKey = await getSecretValue(process.env.GITHUB_PRIVATE_KEY_SECRET_ARN);

return new Octokit({
baseUrl,
authStrategy: createAppAuth,
auth: {
appId: githubSecrets.appId,
privateKey: privateKey,
},
});
}

export async function getRunner(octokit: Octokit, runnerLevel: RunnerLevel, owner: string, repo: string, name: string) {
let page = 1;
while (true) {
Expand Down Expand Up @@ -130,3 +161,17 @@ export async function deleteRunner(octokit: Octokit, runnerLevel: RunnerLevel, o
});
}
}

export async function redeliver(octokit: Octokit, deliveryId: number): Promise<void> {
const response = await octokit.rest.apps.redeliverWebhookDelivery({
delivery_id: deliveryId,
});

if (response.status !== 202) {
throw new Error(`Failed to redeliver webhook delivery with ID ${deliveryId}`);
}
console.log({
notice: 'Successfully redelivered webhook delivery',
deliveryId,
});
}
2 changes: 1 addition & 1 deletion src/lambda-helpers.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { SecretsManagerClient, GetSecretValueCommand, UpdateSecretCommand } from '@aws-sdk/client-secrets-manager';
import { GetSecretValueCommand, SecretsManagerClient, UpdateSecretCommand } from '@aws-sdk/client-secrets-manager';

export interface StepFunctionLambdaInput {
readonly owner: string;
Expand Down
18 changes: 18 additions & 0 deletions src/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import { StatusFunction } from './status-function';
import { TokenRetrieverFunction } from './token-retriever-function';
import { singletonLogGroup, SingletonLogType } from './utils';
import { GithubWebhookHandler } from './webhook';
import { GithubWebhookRedelivery } from './webhook-redelivery';


/**
Expand Down Expand Up @@ -260,6 +261,7 @@ export class GitHubRunners extends Construct implements ec2.IConnectable {
readonly connections: ec2.Connections;

private readonly webhook: GithubWebhookHandler;
private readonly redeliverer: GithubWebhookRedelivery;
private readonly orchestrator: stepfunctions.StateMachine;
private readonly setupUrl: string;
private readonly extraLambdaEnv: { [p: string]: string } = {};
Expand Down Expand Up @@ -315,6 +317,9 @@ export class GitHubRunners extends Construct implements ec2.IConnectable {
}),
requireSelfHostedLabel: this.props?.requireSelfHostedLabel ?? true,
});
this.redeliverer = new GithubWebhookRedelivery(this, 'Webhook Redelivery', {
secrets: this.secrets,
});

this.setupUrl = this.setupFunction();
this.statusFunction();
Expand Down Expand Up @@ -864,5 +869,18 @@ export class GitHubRunners extends Construct implements ec2.IConnectable {
limit: 100,
}),
});

new logs.QueryDefinition(this, 'Webhook redeliveries', {
queryDefinitionName: 'GitHub Runners/Webhook redeliveries',
logGroups: [this.redeliverer.handler.logGroup],
queryString: new logs.QueryString({
fields: ['@timestamp', 'message.notice', 'message.deliveryId', 'message.guid'],
filterStatements: [
'isPresent(message.deliveryId)',
],
sort: '@timestamp desc',
limit: 100,
}),
});
}
}
2 changes: 1 addition & 1 deletion src/webhook-handler.lambda.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ export async function handler(event: AWSLambda.APIGatewayProxyEventV2): Promise<
if (getHeader(event, 'x-github-event') !== 'workflow_job') {
console.error(`This webhook only accepts workflow_job, got ${getHeader(event, 'x-github-event')}`);
return {
statusCode: 400,
statusCode: 200,
body: 'Expecting workflow_job',
};
}
Expand Down
26 changes: 26 additions & 0 deletions src/webhook-redelivery-function.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

141 changes: 141 additions & 0 deletions src/webhook-redelivery.lambda.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import { Octokit } from '@octokit/rest';
import { getAppOctokit, redeliver } from './lambda-github';

/**
* Get webhook delivery failures since the last processed delivery ID.
*
* @internal
*/
async function newDeliveryFailures(octokit: Octokit, sinceId: number) {
const deliveries: Map<string, { id: number; deliveredAt: Date; redelivery: boolean }> = new Map();
const successfulDeliveries: Set<string> = new Set();
const timeLimitMs = 1000 * 60 * 30; // don't look at deliveries over 30 minutes old
let lastId = 0;

for await (const response of octokit.paginate.iterator('GET /app/hook/deliveries')) {
if (response.status !== 200) {
throw new Error('Failed to fetch webhook deliveries');
}

for (const delivery of response.data) {
const deliveredAt = new Date(delivery.delivered_at);
const success = delivery.status === 'OK';

if (delivery.id < sinceId) {
// stop processing if we reach the last processed delivery ID
console.debug({
notice: 'Reached last processed delivery ID',
sinceId: sinceId,
deliveryId: delivery.id,
guid: delivery.guid,
});
return { deliveries, lastId };
}

lastId = Math.max(lastId, delivery.id);

if (deliveredAt.getTime() < Date.now() - timeLimitMs) {
// stop processing if the delivery is too old (for first iteration and performance of further iterations)
console.debug({
notice: 'Stopping at old delivery',
deliveryId: delivery.id,
guid: delivery.guid,
deliveredAt: deliveredAt,
});
return { deliveries, lastId };
}

console.debug({
notice: 'Processing webhook delivery',
deliveryId: delivery.id,
guid: delivery.guid,
status: delivery.status,
deliveredAt: delivery.delivered_at,
redelivery: delivery.redelivery,
});

if (success) {
successfulDeliveries.add(delivery.guid);
continue;
}

if (successfulDeliveries.has(delivery.guid)) {
// do not redeliver deliveries that were already successful
continue;
}

deliveries.set(delivery.guid, { id: delivery.id, deliveredAt, redelivery: delivery.redelivery });
}
}

console.debug({
notice: 'No more webhook deliveries to process',
deliveryId: 'DONE',
guid: 'DONE',
deliveredAt: 'DONE',
});

return { deliveries, lastId };
}

let lastDeliveryIdProcessed = 0;
const failures: Map<string, { id: number; firstDeliveredAt: Date }> = new Map();

export async function handler() {
const octokit = await getAppOctokit();
if (!octokit) {
console.info({
notice: 'Skipping webhook redelivery',
reason: 'App installation might not be configured or the app is not installed.',
});
return;
}

// fetch deliveries since the last processed delivery ID
// for any failures:
// 1. if this is not a redelivery, save the delivery ID and time, and finally retry
// 2. if this is a redelivery, check if the original delivery is still within the time limit and retry if it is
const { deliveries, lastId } = await newDeliveryFailures(octokit, lastDeliveryIdProcessed);
lastDeliveryIdProcessed = Math.max(lastDeliveryIdProcessed, lastId);
const timeLimitMs = 1000 * 60 * 60 * 3; // retry for up to 3 hours
for (const [guid, details] of deliveries) {
if (!details.redelivery) {
failures.set(guid, { id: details.id, firstDeliveredAt: details.deliveredAt });
console.log({
notice: 'Redelivering failed delivery',
deliveryId: details.id,
guid: guid,
firstDeliveredAt: details.deliveredAt,
});
await redeliver(octokit, details.id);
} else {
// if this is a redelivery, check if the original delivery is still within the time limit
const originalFailure = failures.get(guid);
if (originalFailure) {
if (new Date().getTime() - originalFailure.firstDeliveredAt.getTime() < timeLimitMs) {
console.log({
notice: 'Redelivering failed delivery',
deliveryId: details.id,
guid: guid,
firstDeliveredAt: originalFailure.firstDeliveredAt,
});
await redeliver(octokit, details.id);
} else {
failures.delete(guid); // no need to keep track of this anymore
console.log({
notice: 'Skipping redelivery of old failed delivery',
deliveryId: details.id,
guid: guid,
firstDeliveredAt: originalFailure?.firstDeliveredAt,
});
}
} else {
console.log({
notice: 'Skipping redelivery of old failed delivery',
deliveryId: details.id,
guid: guid,
});
}
}
}
}
Loading
Loading