Skip to content

Commit 68000b4

Browse files
kaizenccrix0rrr
andauthored
fix: npm follower occasionally drops incoming updates (#1831)
The npm follower grabs information from multiple sources that can be out of sync. This subsequently causes some new package updates to not be identified and sent to ingestion. <img width="433" height="612" alt="Screenshot 2026-01-07 at 6 03 43 PM" src="https://github.com/user-attachments/assets/99c4a8d5-cd90-43d0-9d7a-f8cf1e56683f" /> Basically, with the new NPM API, our NPM follower ingests information from multiple sources. Occasionally there is a delay between picking up a new change from the NPM changes stream and the change actually showing up in the specific package metadata. Previously, we would just drop this change altogether as unactionable. Subsequent updates to the package could trigger another metadata lookup that would grab the change, but this results in an indeterminate delay. Now, we identify that the metadata is out of sync by comparing the sequential part of the `rev` property. If the metadata `rev` is below the changes `rev`, we know it is lagging behind and exponentially retry until the metadata is updated or until 30 seconds (where we then proceed with the out of sync data). The NPM follower function does some basic math for when to grab the next batch and when to exit and wait for a subsequent lambda execution. Because this change increases the batch time by up to 30 seconds, I have increased the expected worst-case-scenario time by 30 seconds as well. ---- *By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license* --------- Co-authored-by: Rico Huijbers <[email protected]>
1 parent 64e4485 commit 68000b4

File tree

4 files changed

+63
-19
lines changed

4 files changed

+63
-19
lines changed

src/__tests__/__snapshots__/construct-hub.test.ts.snap

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/__tests__/devapp/__snapshots__/snapshot.test.ts.snap

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/package-sources/npmjs/couch-changes.lambda-shared.ts

Lines changed: 55 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ const DEFAULT_BATCH_SIZE = 100;
1616

1717
const MAX_CONNS_PER_HOST = 100;
1818

19+
const MAX_PACKAGE_SERVER_LAG_MS = 30_000; // 30 seconds
20+
1921
/**
2022
* A utility class that helps with traversing CouchDB database changes streams
2123
* in a promise-based, page-by-page manner.
@@ -82,29 +84,60 @@ export class CouchChanges extends EventEmitter {
8284
};
8385
}
8486

87+
/**
88+
* Fetch the metadata associated with a change. The change comes associated with a revision number,
89+
* which can be compared to the revision number of the metadata to determine if the replica is
90+
* lagging behind the changes stream. If so, we retry until the replica is up-to-date or until
91+
* 30 seconds elapsed after which we return the potentially stale metadata.
92+
*/
8593
private async fetchAndFilterMetadata(change: DatabaseChange) {
8694
// Filter out deleted packages or null ids
8795
if (change.deleted || !change.id) {
8896
console.log(`Skipping ${change.id}: deleted or null id`);
8997
return;
9098
}
9199

100+
const latestChangesRev = getMaxSequentialRevision(change);
92101
const metadataUrl = new URL(change.id, NPM_REGISTRY_URL);
93102
console.log(`Fetching metadata for ${change.id}: ${metadataUrl}`);
94103

95-
try {
96-
const meta = await this.https('get', metadataUrl);
97-
change.doc = meta; // add metadata to the change object
98-
return change;
99-
} catch (e: any) {
100-
if (e.message?.includes('HTTP 404')) {
101-
console.log(
102-
`Skipping ${change.id} because of HTTP 404 (Not Found) error`
103-
);
104-
return;
104+
// Retry configuration
105+
const baseDelay = 1_000; // 1 second
106+
const maxDelay = 8_000; // 8 seconds max
107+
let attempt = 0;
108+
const startTime = Date.now();
109+
110+
while(Date.now() - startTime < MAX_PACKAGE_SERVER_LAG_MS) {
111+
try {
112+
const meta = await this.https('get', metadataUrl);
113+
const latestReplicaRev = parseSequentialRevision(meta._rev as string);
114+
115+
change.doc = meta; // add metadata to the change object
116+
117+
// Happy path: replica is up-to-date
118+
if (latestReplicaRev >= latestChangesRev) {
119+
return change;
120+
}
121+
122+
// Unhappy path: replica is behind. Calculate delay and retry
123+
const delay = Math.floor(Math.random() * Math.min(baseDelay * Math.pow(2, attempt), maxDelay));
124+
console.log(`${change.id}: package _rev ${latestReplicaRev} < expected replication rev ${latestChangesRev}, retrying in ${delay} ms`);
125+
await new Promise(resolve => setTimeout(resolve, delay));
126+
attempt++;
127+
} catch (e: any) {
128+
if (e.message?.includes('HTTP 404')) {
129+
console.log(
130+
`Skipping ${change.id} because of HTTP 404 (Not Found) error`
131+
);
132+
return;
133+
}
134+
throw e;
105135
}
106-
throw e;
107136
}
137+
138+
// Timeout reached, proceed with stale data
139+
console.log(`Timeout reached for ${change.id}, replica may be stale`);
140+
return change;
108141
}
109142

110143
private async fetchAndFilterAllMetadata(
@@ -245,6 +278,17 @@ async function sleep(ms: number) {
245278
return new Promise((ok) => setTimeout(ok, ms));
246279
}
247280

281+
function parseSequentialRevision(rev: string): number {
282+
return parseInt(rev.split('-')[0]);
283+
}
284+
285+
function getMaxSequentialRevision(change: DatabaseChange): number {
286+
return Math.max(...change.changes
287+
.map(change => parseSequentialRevision(change.rev))
288+
.filter(num => !isNaN(num))
289+
);
290+
}
291+
248292
export interface DatabaseChanges {
249293
/**
250294
* The last sequence ID from this change set. This is the value that should be

src/package-sources/npmjs/npm-js-follower.lambda.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,9 @@ export async function handler(event: ScheduledEvent, context: Context) {
9797
// The last written marker seq id.
9898
let updatedMarker = initialMarker;
9999

100-
// The slowest batch processing time so far (starts at 30 seconds). This is how much time should
100+
// The slowest batch processing time so far (starts at 60 seconds). This is how much time should
101101
// be left before timeout if a new batch is to be fetched.
102-
let maxBatchProcessingTime = 30_000;
102+
let maxBatchProcessingTime = 60_000;
103103
// Whether we should continue reading more items or not... This is set to false when the current
104104
// latest change is reached (i.e: next page of changes is empty).
105105
let shouldContinue = true;

0 commit comments

Comments
 (0)