Skip to content

Commit

Permalink
feat: initial implementation of contrib/most.ts
Browse files Browse the repository at this point in the history
Create the initial type classes for most.js Stream. Reexport a few most
libraries for convenience:

* @most/core
* @most/types
* @most/scheduler#{newScheduler, newDefaultScheduler}
* @most/hold

It seems @most/index is not published to npm so I've replicated the primary
exports here. They are MIT licensed.

Additionally, made a dangerous collect function like the ones in iterable.ts and
async_iterable.ts. This is primarily useful for testing. I should probably make
the returned promise abortable.
  • Loading branch information
baetheus committed Nov 28, 2023
1 parent a73e11b commit 32e5293
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 0 deletions.
139 changes: 139 additions & 0 deletions contrib/most.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import type { Scheduler, Stream } from "npm:@most/[email protected]";
import type { $, In, InOut, Kind, Out } from "../kind.ts";
import type { Wrappable } from "../wrappable.ts";
import type { Mappable } from "../mappable.ts";
import type { Applicable } from "../applicable.ts";
import type { Flatmappable } from "../flatmappable.ts";

import * as M from "npm:@most/[email protected]";
import { createBind, createTap } from "../flatmappable.ts";
import { createBindTo } from "../mappable.ts";
import { flow, pipe } from "../fn.ts";

export * from "npm:@most/[email protected]";
export * from "npm:@most/[email protected]";
export * from "npm:@most/[email protected]";
export { newDefaultScheduler, newScheduler } from "npm:@most/[email protected]";

export interface KindStream extends Kind {
readonly kind: Stream<Out<this, 0>>;
}

export function count<A>(sa: Stream<A>): Stream<number> {
return keepIndex(withCount(sa));
}

export function withCount<A>(sa: Stream<A>): Stream<[number, A]> {
return withIndexStart(1, sa);
}

export function index<A>(sa: Stream<A>): Stream<number> {
return keepIndex(withIndex(sa));
}

export function withIndex<A>(sa: Stream<A>): Stream<[number, A]> {
return withIndexStart(0, sa);
}

export function withIndexStart<A>(
start: number,
sa: Stream<A>,
): Stream<[number, A]> {
return indexed((i) => [i, i + 1], start, sa);
}

export function indexed<S, I, A>(
f: (s: S) => [I, S],
init: S,
sa: Stream<A>,
): Stream<[I, A]> {
return M.loop(
(s, a) => {
const [index, seed] = f(s);
return { seed, value: [index, a] };
},
init,
sa,
);
}

export function keepIndex<I>(s: Stream<[I, unknown]>): Stream<I> {
return M.map((ia) => ia[0], s);
}

export async function collect<A>(
stream: Stream<A>,
scheduler: Scheduler,
): Promise<readonly A[]> {
const as: A[] = [];
await M.runEffects(pipe(stream, M.tap((a) => as.push(a))), scheduler);
return as;
}

export const wrap: Wrappable<KindStream>["wrap"] = M.now;

export const map: Mappable<KindStream>["map"] = M.map;

export const apply: Applicable<KindStream>["apply"] = M.ap;

export const flatmap: Flatmappable<KindStream>["flatmap"] = M.chain;

export const WrappableStream: Wrappable<KindStream> = { wrap };

export const MappableStream: Mappable<KindStream> = { map };

export const ApplicableStream: Applicable<KindStream> = { wrap, map, apply };

export const FlatmappableStream: Flatmappable<KindStream> = {
wrap,
map,
apply,
flatmap,
};

export const bind = createBind(FlatmappableStream);

export const bindTo = createBindTo(FlatmappableStream);

export const tap = createTap(FlatmappableStream);

export interface TransformStream<U extends Kind> extends Kind {
readonly kind: Stream<
$<
U,
[Out<this, 0>, Out<this, 1>, Out<this, 2>],
[In<this, 0>],
[InOut<this, 0>]
>
>;
}

export function transformStream<U extends Kind>(
FM: Flatmappable<U>,
extract: <
I,
J = unknown,
K = unknown,
L = never,
M = never,
B = unknown,
C = unknown,
D = never,
E = never,
>(
usua: $<U, [Stream<$<U, [I, J, K], [L], [M]>>, B, C], [D], [E]>,
) => Stream<$<U, [I, J | B, K | C], [L & D], [M & E]>>,
): Flatmappable<TransformStream<U>> {
return {
wrap: (a) => wrap(FM.wrap(a)),
map: (fai) => map(FM.map(fai)),
apply: M.combine(FM.apply) as Flatmappable<TransformStream<U>>["apply"],
flatmap: (faui) => (sua) =>
pipe(
sua,
flatmap(flow(FM.map(faui), extract)),
),
};
}

export type * from "npm:@most/[email protected]"; /** Export types */
13 changes: 13 additions & 0 deletions examples/most.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import * as M from "../contrib/most.ts";
import { pipe } from "../fn.ts";

const stream = pipe(
pipe(M.periodic(1000), M.scan((a) => a + 1, 0)),
M.bindTo("seconds"),
M.bind("timestamps", () => M.wrap(Date.now())),
M.tap((a) => console.log(a)),
M.take(10),
);

// Strangely, this emits the first two events quickly.
await M.runEffects(stream)(M.newDefaultScheduler());
11 changes: 11 additions & 0 deletions testing/contrib/most.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { assertEquals } from "https://deno.land/[email protected]/testing/asserts.ts";

import type { Stream } from "../../contrib/most.ts";
import * as M from "../../contrib/most.ts";

const scheduler = M.newDefaultScheduler();
const run = <A>(s: Stream<A>): Promise<readonly A[]> => M.collect(s, scheduler);

Deno.test("Most wrap", async () => {
assertEquals(await run(M.wrap(1)), [1]);
});

0 comments on commit 32e5293

Please sign in to comment.