Skip to content

Commit

Permalink
async generators working in channels
Browse files Browse the repository at this point in the history
  • Loading branch information
martypdx committed Mar 5, 2024
1 parent 8c2cefe commit 3a21ed0
Show file tree
Hide file tree
Showing 4 changed files with 331 additions and 126 deletions.
77 changes: 50 additions & 27 deletions packages/channels/channels.js
Original file line number Diff line number Diff line change
@@ -1,54 +1,77 @@
import { Multicast } from './generators.js';

export function use(asyncSource, ...args) {
const [transforms, options] = processArguments(args);
const [channels, options] = getArguments(args);
const type = typeof asyncSource;

switch(true) {
case asyncSource instanceof Promise:
return transforms.length < 2
? [fromPromise(asyncSource, transforms[0], options)]
: branchPromise(asyncSource, transforms, options);
return channels.length < 2
? [fromPromise(asyncSource, channels[0], options)]
: branchPromise(asyncSource, channels, options);
case !!asyncSource[Symbol.asyncIterator]:
return channels.length < 2
? [fromAsyncIterator(asyncSource, channels[0], options)]
: branchAsyncIterator(asyncSource, channels, options);
default:
throwAsyncSourceTypeError(type);
}
}

function fromPromise(promise, transform, options) {
function getArguments(channels) {
let options = null;
if(channels.length) {
const maybeOptions = channels.at(-1);
if(typeof maybeOptions === 'object') {
options = maybeOptions;
channels.length--;
}
}
return [channels, options];
}

function fromPromise(promise, channel, options) {
const startWith = options?.startWith;
if(startWith) {
return fromPromiseStartWith(promise, transform, startWith);
return fromPromiseStartWith(promise, channel, startWith);
}
return [channel ? promise.then(channel) : promise];
}

async function* fromPromiseStartWith(promise, channel, startWith) {
yield startWith;
yield channel ? promise.then(channel) : promise;
}

async function* fromAsyncIterator(iterator, channel, options) {
const startWith = options?.startWith;
if(startWith) yield startWith;
for await(const value of iterator) {
yield channel ? channel(value) : value;
}
return [transform ? promise.then(transform) : promise];
}

function branchPromise(promise, transforms) {
return transforms.map(transform => {
if(Array.isArray(transform)) { // [transform, options]
return fromPromise(promise, transform[0], transform[1]);
function branchPromise(promise, channels) {
return channels.map(channel => {
if(Array.isArray(channel)) { // [channel, options]
return fromPromise(promise, channel[0], channel[1]);
}
return promise.then(transform);
return promise.then(channel);
});
}

async function* fromPromiseStartWith(promise, transform, startWith) {
yield startWith;
yield transform ? promise.then(transform) : promise;
function branchAsyncIterator(iterator, channels, options) {
const multicast = new Multicast(iterator);
return channels.map(channel => {
if(Array.isArray(channel)) { // [channel, options]
return multicast.subscriber(channel[0], channel[1]);
}
return multicast.subscriber(channel, options);
});
}

function throwAsyncSourceTypeError(type) {
throw new TypeError(`\
Unexpected asynchronous data source type "${type}". Expected an async data provider type, or \
a function that returns an async data provider type."`);
}

function processArguments(transforms) {
let options = null;
if(transforms.length) {
const maybeOptions = transforms.at(-1);
if(typeof maybeOptions === 'object') {
options = maybeOptions;
transforms.length--;
}
}
return [transforms, options];
}
Loading

0 comments on commit 3a21ed0

Please sign in to comment.