Skip to content

Commit e1e5daa

Browse files
domfarolinomoz-wptsync-bot
authored andcommitted
Bug 1948370 [wpt PR 50723] - DOM: Implement ref-counted Observable producer, a=testonly
Automatic update from web-platform-tests DOM: Implement ref-counted Observable producer This CL implements ref-counted producers, which came out of the W3C TPAC discussions in 2024. After this CL, multiple `ObservableInternalObserver` objects can be associated/registered with a single active `Subscriber`. The main meat of this CL involves the logic managing consumer unsubscription, to ensure that only once *all* associated consumers/observers unsubscribe do we actually close down a `Subscriber`. See WICG/observable#197. R=masonf Bug: 363015168 Change-Id: I67b63a3f4e38bf5be0236fd1b8f025648a3089bf Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/6221901 Reviewed-by: Mason Freed <[email protected]> Commit-Queue: Dominic Farolino <[email protected]> Cr-Commit-Position: refs/heads/main@{#1420647} -- wpt-commits: 976ac8e83a9197bf3153d902435aea5620da8df6 wpt-pr: 50723
1 parent f467d0c commit e1e5daa

File tree

2 files changed

+158
-48
lines changed

2 files changed

+158
-48
lines changed

Diff for: testing/web-platform/tests/dom/observable/tentative/observable-constructor.any.js

+108
Original file line numberDiff line numberDiff line change
@@ -1091,3 +1091,111 @@ test(() => {
10911091
assert_array_equals(addTeardownsCalled, ["teardown 1", "teardown 2"],
10921092
"Teardowns called synchronously upon addition end up in FIFO order");
10931093
}, "Teardowns should be called synchronously during addTeardown() if the subscription is inactive");
1094+
1095+
test(() => {
1096+
const results = [];
1097+
let producerInvocations = 0;
1098+
let teardownInvocations = 0;
1099+
1100+
const source = new Observable((subscriber) => {
1101+
producerInvocations++;
1102+
results.push('producer invoked');
1103+
subscriber.addTeardown(() => {
1104+
teardownInvocations++;
1105+
results.push('teardown invoked');
1106+
});
1107+
});
1108+
1109+
const ac1 = new AbortController();
1110+
const ac2 = new AbortController();
1111+
1112+
// First subscription.
1113+
source.subscribe({}, {signal: ac1.signal});
1114+
assert_equals(producerInvocations, 1,
1115+
"Producer is invoked once for first subscription");
1116+
1117+
// Second subscription should reuse the same producer.
1118+
source.subscribe({}, {signal: ac2.signal});
1119+
assert_equals(producerInvocations, 1,
1120+
"Producer should not be invoked again for second subscription");
1121+
1122+
// First unsubscribe.
1123+
ac1.abort();
1124+
assert_equals(teardownInvocations, 0,
1125+
"Teardown not run when first subscriber unsubscribes");
1126+
1127+
// Second unsubscribe.
1128+
ac2.abort();
1129+
assert_equals(teardownInvocations, 1,
1130+
"Teardown should run after last subscriber unsubscribes");
1131+
1132+
assert_array_equals(results, ['producer invoked', 'teardown invoked']);
1133+
}, "Multiple subscriptions share the same producer and teardown runs only " +
1134+
"after last subscription abort");
1135+
1136+
test(() => {
1137+
const results = [];
1138+
let activeSubscriber = null;
1139+
1140+
const source = new Observable(subscriber => {
1141+
activeSubscriber = subscriber;
1142+
results.push('producer start');
1143+
subscriber.addTeardown(() => results.push('teardown'));
1144+
});
1145+
1146+
// First subscription.
1147+
const ac1 = new AbortController();
1148+
source.subscribe({}, {signal: ac1.signal});
1149+
assert_array_equals(results, ['producer start']);
1150+
1151+
// Second subscription.
1152+
const ac2 = new AbortController();
1153+
source.subscribe({}, {signal: ac2.signal});
1154+
1155+
// Complete the subscription.
1156+
activeSubscriber.complete();
1157+
assert_array_equals(results, ['producer start', 'teardown']);
1158+
1159+
// Additional subscription after complete.
1160+
const ac3 = new AbortController();
1161+
source.subscribe({}, {signal: ac3.signal});
1162+
1163+
assert_array_equals(results, ['producer start', 'teardown', 'producer start']);
1164+
}, "New subscription after complete creates new producer");
1165+
1166+
test(() => {
1167+
const results = [];
1168+
let producerInvocations = 0;
1169+
1170+
const source = new Observable(subscriber => {
1171+
producerInvocations++;
1172+
results.push('producer start');
1173+
subscriber.addTeardown(() => results.push('teardown'));
1174+
});
1175+
1176+
// Create 3 subscriptions.
1177+
const ac1 = new AbortController();
1178+
const ac2 = new AbortController();
1179+
const ac3 = new AbortController();
1180+
source.subscribe({}, {signal: ac1.signal});
1181+
source.subscribe({}, {signal: ac2.signal});
1182+
source.subscribe({}, {signal: ac3.signal});
1183+
1184+
assert_equals(producerInvocations, 1, "Producer should be invoked once");
1185+
1186+
// Unsubscribe in a different order.
1187+
ac2.abort();
1188+
results.push('after first abort');
1189+
ac1.abort();
1190+
results.push('after second abort');
1191+
ac3.abort();
1192+
results.push('after final abort');
1193+
1194+
assert_array_equals(results, [
1195+
'producer start',
1196+
'after first abort',
1197+
'after second abort',
1198+
'teardown',
1199+
'after final abort'
1200+
]);
1201+
}, "Teardown runs after last unsubscribe regardless of unsubscription order");

Diff for: testing/web-platform/tests/dom/observable/tentative/observable-from.any.js

+50-48
Original file line numberDiff line numberDiff line change
@@ -674,38 +674,21 @@ promise_test(async t => {
674674

675675
// This test is a more chaotic version of the above. It ensures that a single
676676
// Observable can handle multiple in-flight subscriptions to the same underlying
677-
// async iterable without the two subscriptions competing.
678-
//
679-
// This test is added because it is easy to imagine an implementation whereby
680-
// upon subscription, the Observable's internal subscribe callback takes the
681-
// underlying async iterable object, and simply pulls the async iterator off of
682-
// it (by invoking `@@asyncIterator`), and saves it alongside the underlying
683-
// async iterable. This async iterator would be used to manage values as they
684-
// are asynchronously emitted from the underlying object, but this value can get
685-
// OVERWRITTEN by a brand new subscription that comes in before the first
686-
// subscription has completed. In a broken implementation, this overwriting
687-
// would prevent the first subscription from ever completing.
677+
// async iterable without the two subscriptions competing. It asserts that the
678+
// asynchronous values are pushed to the observers in the correct order.
688679
promise_test(async t => {
689680
const async_iterable = {
690-
slow: true,
691681
[Symbol.asyncIterator]() {
692-
// The first time @@asyncIterator is called, `shouldBeSlow` is true, and
693-
// when the return object takes closure of it, all values are emitted
694-
// SLOWLY asynchronously. The second time, `shouldBeSlow` is false, and
695-
// all values are emitted FAST but still asynchronous.
696-
const shouldBeSlow = this.slow;
697-
this.slow = false;
698-
699682
return {
700683
val: 0,
701684
next() {
702685
// Returns a Promise that resolves in a random amount of time less
703686
// than a second.
704687
return new Promise(resolve => {
705688
t.step_timeout(() => resolve({
706-
value: `${this.val}-${shouldBeSlow ? 'slow' : 'fast'}`,
689+
value: this.val,
707690
done: this.val++ === 4 ? true : false,
708-
}), shouldBeSlow ? 200 : 0);
691+
}), 200);
709692
});
710693
},
711694
};
@@ -715,30 +698,46 @@ promise_test(async t => {
715698
const results = [];
716699
const source = Observable.from(async_iterable);
717700

718-
const subscribeFunction = function(resolve, reject) {
701+
const promise = new Promise(resolve => {
719702
source.subscribe({
720-
next: v => results.push(v),
721-
complete: () => resolve(),
703+
next: v => {
704+
results.push(`${v}-first-sub`);
705+
706+
// Half-way through the first subscription, start another subscription.
707+
if (v === 0) {
708+
source.subscribe({
709+
next: v => results.push(`${v}-second-sub`),
710+
complete: () => {
711+
results.push('complete-second-sub');
712+
resolve();
713+
}
714+
});
715+
}
716+
},
717+
complete: () => {
718+
results.push('complete-first-sub');
719+
resolve();
720+
}
722721
});
722+
});
723723

724-
// A broken implementation will rely on this timeout.
725-
t.step_timeout(() => reject('TIMEOUT'), 3000);
726-
}
727-
728-
const slow_promise = new Promise(subscribeFunction);
729-
const fast_promise = new Promise(subscribeFunction);
730-
await Promise.all([slow_promise, fast_promise]);
724+
await promise;
731725
assert_array_equals(results, [
732-
'0-fast',
733-
'1-fast',
734-
'2-fast',
735-
'3-fast',
736-
'0-slow',
737-
'1-slow',
738-
'2-slow',
739-
'3-slow',
726+
'0-first-sub',
727+
728+
'1-first-sub',
729+
'1-second-sub',
730+
731+
'2-first-sub',
732+
'2-second-sub',
733+
734+
'3-first-sub',
735+
'3-second-sub',
736+
737+
'complete-first-sub',
738+
'complete-second-sub',
740739
]);
741-
}, "from(): Asynchronous iterable multiple in-flight subscriptions competing");
740+
}, "from(): Asynchronous iterable multiple in-flight subscriptions");
742741
// This test is like the above, ensuring that multiple subscriptions to the same
743742
// sync-iterable-converted-Observable can exist at a time. Since sync iterables
744743
// push all of their values to the Observable synchronously, the way to do this
@@ -751,24 +750,27 @@ test(() => {
751750
const source = Observable.from(array);
752751
source.subscribe({
753752
next: v => {
754-
results.push(v);
753+
results.push(`${v}-first-sub`);
755754
if (v === 3) {
756755
// Pushes all 5 values to `results` right after the first instance of `3`.
757756
source.subscribe({
758-
next: v => results.push(v),
759-
complete: () => results.push('inner complete'),
757+
next: v => results.push(`${v}-second-sub`),
758+
complete: () => results.push('complete-second-sub'),
760759
});
761760
}
762761
},
763-
complete: () => results.push('outer complete'),
762+
complete: () => results.push('complete-first-sub'),
764763
});
765764

766765
assert_array_equals(results, [
767-
1, 2, 3,
768-
1, 2, 3, 4, 5, 'inner complete',
769-
4, 5, 'outer complete'
766+
// These values are pushed when there is only a single subscription.
767+
'1-first-sub', '2-first-sub', '3-first-sub',
768+
// These values are pushed in the correct order, for two subscriptions.
769+
'4-first-sub', '4-second-sub',
770+
'5-first-sub', '5-second-sub',
771+
'complete-first-sub', 'complete-second-sub',
770772
]);
771-
}, "from(): Sync iterable multiple in-flight subscriptions competing");
773+
}, "from(): Sync iterable multiple in-flight subscriptions");
772774

773775
promise_test(async () => {
774776
const async_generator = async function*() {

0 commit comments

Comments
 (0)