diff --git a/contrib/most.ts b/contrib/most.ts new file mode 100644 index 0000000..96165dc --- /dev/null +++ b/contrib/most.ts @@ -0,0 +1,139 @@ +import type { Scheduler, Stream } from "npm:@most/types@1.1.0"; +import type { $, In, InOut, Kind, Out } from "../kind.ts"; +import type { Wrappable } from "../wrappable.ts"; +import type { Mappable } from "../mappable.ts"; +import type { Applicable } from "../applicable.ts"; +import type { Flatmappable } from "../flatmappable.ts"; + +import * as M from "npm:@most/core@1.6.1"; +import { createBind, createTap } from "../flatmappable.ts"; +import { createBindTo } from "../mappable.ts"; +import { flow, pipe } from "../fn.ts"; + +export * from "npm:@most/core@1.6.1"; +export * from "npm:@most/hold@4.1.0"; +export * from "npm:@most/adapter@1.0.0"; +export { newDefaultScheduler, newScheduler } from "npm:@most/scheduler@1.3.0"; + +export interface KindStream extends Kind { + readonly kind: Stream>; +} + +export function count(sa: Stream): Stream { + return keepIndex(withCount(sa)); +} + +export function withCount(sa: Stream): Stream<[number, A]> { + return withIndexStart(1, sa); +} + +export function index(sa: Stream): Stream { + return keepIndex(withIndex(sa)); +} + +export function withIndex(sa: Stream): Stream<[number, A]> { + return withIndexStart(0, sa); +} + +export function withIndexStart( + start: number, + sa: Stream, +): Stream<[number, A]> { + return indexed((i) => [i, i + 1], start, sa); +} + +export function indexed( + f: (s: S) => [I, S], + init: S, + sa: Stream, +): Stream<[I, A]> { + return M.loop( + (s, a) => { + const [index, seed] = f(s); + return { seed, value: [index, a] }; + }, + init, + sa, + ); +} + +export function keepIndex(s: Stream<[I, unknown]>): Stream { + return M.map((ia) => ia[0], s); +} + +export async function collect( + stream: Stream, + scheduler: Scheduler, +): Promise { + const as: A[] = []; + await M.runEffects(pipe(stream, M.tap((a) => as.push(a))), scheduler); + return as; +} + +export const wrap: Wrappable["wrap"] = M.now; + +export const map: Mappable["map"] = M.map; + +export const apply: Applicable["apply"] = M.ap; + +export const flatmap: Flatmappable["flatmap"] = M.chain; + +export const WrappableStream: Wrappable = { wrap }; + +export const MappableStream: Mappable = { map }; + +export const ApplicableStream: Applicable = { wrap, map, apply }; + +export const FlatmappableStream: Flatmappable = { + wrap, + map, + apply, + flatmap, +}; + +export const bind = createBind(FlatmappableStream); + +export const bindTo = createBindTo(FlatmappableStream); + +export const tap = createTap(FlatmappableStream); + +export interface TransformStream extends Kind { + readonly kind: Stream< + $< + U, + [Out, Out, Out], + [In], + [InOut] + > + >; +} + +export function transformStream( + FM: Flatmappable, + extract: < + I, + J = unknown, + K = unknown, + L = never, + M = never, + B = unknown, + C = unknown, + D = never, + E = never, + >( + usua: $>, B, C], [D], [E]>, + ) => Stream<$>, +): Flatmappable> { + return { + wrap: (a) => wrap(FM.wrap(a)), + map: (fai) => map(FM.map(fai)), + apply: M.combine(FM.apply) as Flatmappable>["apply"], + flatmap: (faui) => (sua) => + pipe( + sua, + flatmap(flow(FM.map(faui), extract)), + ), + }; +} + +export type * from "npm:@most/types@1.1.0"; /** Export types */ diff --git a/examples/most.ts b/examples/most.ts new file mode 100644 index 0000000..a47ce4a --- /dev/null +++ b/examples/most.ts @@ -0,0 +1,13 @@ +import * as M from "../contrib/most.ts"; +import { pipe } from "../fn.ts"; + +const stream = pipe( + pipe(M.periodic(1000), M.scan((a) => a + 1, 0)), + M.bindTo("seconds"), + M.bind("timestamps", () => M.wrap(Date.now())), + M.tap((a) => console.log(a)), + M.take(10), +); + +// Strangely, this emits the first two events quickly. +await M.runEffects(stream)(M.newDefaultScheduler()); diff --git a/testing/contrib/most.test.ts b/testing/contrib/most.test.ts new file mode 100644 index 0000000..4043cb1 --- /dev/null +++ b/testing/contrib/most.test.ts @@ -0,0 +1,11 @@ +import { assertEquals } from "https://deno.land/std@0.103.0/testing/asserts.ts"; + +import type { Stream } from "../../contrib/most.ts"; +import * as M from "../../contrib/most.ts"; + +const scheduler = M.newDefaultScheduler(); +const run = (s: Stream): Promise => M.collect(s, scheduler); + +Deno.test("Most wrap", async () => { + assertEquals(await run(M.wrap(1)), [1]); +});