11import { debug } from "./shared.js"
22import { publisher , Send , Cancel , combineCancels } from "./publisher.js"
3- import { signal as signal , Signal , __updates__ } from "./signal.js"
3+ import { state , Signal , __updates__ } from "./signal.js"
44
55const __sink__ = Symbol ( 'sink' )
66
@@ -18,20 +18,33 @@ export const sink = <T>(
1818 subscriber : Send < T >
1919) => upstream [ __sink__ ] ( subscriber )
2020
21- /** Create a new stream source */
22- export const stream = < T > (
21+ /**
22+ * Create a stream subject - a source for a stream that has a send method
23+ * for publishing new items to stream.
24+ */
25+ export const subject = < T > ( ) => {
26+ const { pub, sub } = publisher < T > ( )
27+ return { [ __sink__ ] : sub , send : pub }
28+ }
29+
30+ /**
31+ * Create a new stream source using a closure to publish new items to stream
32+ * Closure receives a single argument, a send function to publish new items,
33+ * and may return a cancel function to stop generation and clean up resources.
34+ */
35+ export const generate = < T > (
2336 generate : ( send : Send < T > ) => Cancel | undefined
2437) : Stream < T > => {
25- const { pub , sub } = publisher < T > ( )
26- const cancel = generate ( pub )
27- return { [ __sink__ ] : sub , cancel }
38+ const { [ __sink__ ] : sink , send } = subject < T > ( )
39+ const cancel = generate ( send )
40+ return { [ __sink__ ] : sink , cancel }
2841}
2942
3043/** Map a stream of values */
3144export const map = < T , U > (
3245 upstream : Stream < T > ,
3346 transform : ( value : T ) => U
34- ) => stream < U > ( send => {
47+ ) => generate < U > ( send => {
3548 return sink ( upstream , value => send ( transform ( value ) ) )
3649} )
3750
@@ -51,7 +64,7 @@ export const select = <T extends object, U extends keyof T & string>(
5164export const filter = < T > (
5265 upstream : Stream < T > ,
5366 predicate : ( value : T ) => boolean
54- ) => stream < T > ( send => {
67+ ) => generate < T > ( send => {
5568 return sink ( upstream , value => {
5669 if ( predicate ( value ) ) {
5770 send ( value )
@@ -67,7 +80,7 @@ export const zip = <T, U, V>(
6780 left : Stream < T > ,
6881 right : Stream < U > ,
6982 combine : ( left : T , right : U ) => V
70- ) => stream < V > ( send => {
83+ ) => generate < V > ( send => {
7184 const leftQueue : Array < T > = [ ]
7285 const rightQueue : Array < U > = [ ]
7386
@@ -100,7 +113,7 @@ export const scan = <T, U>(
100113 step : ( state : U , value : T ) => U ,
101114 initial : U
102115) : Signal < U > => {
103- const { get, [ __updates__ ] : updates , send } = signal ( initial )
116+ const { get, [ __updates__ ] : updates , send } = state ( initial )
104117 const cancel = sink ( upstream , ( value : T ) => {
105118 send ( step ( get ( ) , value ) )
106119 } )
0 commit comments