Skip to content

Commit 84d1150

Browse files
Build FRP stream graph out of subscribers (#44)
Stream publishers now hold a list of Sendables (subscribers with a send method). Holding a list of subscribers allows us to provide traversal and serialization methods on subscribers, while returning a readonly handle to derived streams.
1 parent bd8056e commit 84d1150

File tree

5 files changed

+139
-86
lines changed

5 files changed

+139
-86
lines changed
Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1-
import { generate } from './stream.js'
1+
import { subject, readonly, __sink__ } from './stream.js'
22

33
export const events = (
44
element: HTMLElement,
55
name: string
6-
) => generate((send: (value: Event) => void) => {
7-
element.addEventListener(name, send)
8-
return () => element.removeEventListener(name, send)
9-
})
6+
) => {
7+
const event = subject()
8+
element.addEventListener(name, event.send)
9+
return readonly({
10+
...event,
11+
cancel: () => element.removeEventListener(name, event.send)
12+
})
13+
}

typescript/packages/common-frp/src/operators.ts

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import {
22
map as mapStream,
3-
select as selectStream,
43
filter as filterStream,
54
join as joinStreams,
65
chooseLeft,
@@ -99,12 +98,6 @@ export const map = <T, U>(
9998
stream: Stream<T>
10099
) => mapStream(stream, transform)
101100

102-
export const select = <T extends object, U extends keyof T & string>(
103-
key: U
104-
) => (
105-
stream: Stream<T>
106-
) => selectStream(stream, key)
107-
108101
/** Filter a stream of values using a predicate function. */
109102
export const filter = <T>(
110103
predicate: UnaryFn<T, boolean>

typescript/packages/common-frp/src/publisher.ts

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,19 @@ export type Sendable<T> = {
1313
send: Send<T>
1414
}
1515

16+
export type Publisher<T> = {
17+
[Symbol.iterator]: () => Iterator<Sendable<T>>;
18+
send: (value: T) => void;
19+
sink: (subscriber: Sendable<T>) => Cancel;
20+
}
21+
1622
/** Low-level pub-sub channel used under the hood. */
17-
export const publisher = <T>() => {
18-
const subscribers = new Set<Send<T>>()
23+
export const publisher = <T>(): Publisher<T> => {
24+
const subscribers = new Set<Sendable<T>>()
1925

20-
const pub = (value: T) => {
26+
const send = (value: T) => {
2127
for (const subscriber of subscribers) {
22-
subscriber(value)
28+
subscriber.send(value)
2329
}
2430
}
2531

@@ -28,7 +34,7 @@ export const publisher = <T>() => {
2834
* @param subscriber the function to call when a new value is published
2935
* @returns Unsubscribe function
3036
*/
31-
const sub = (subscriber: Send<T>): Cancel => {
37+
const sink = (subscriber: Sendable<T>): Cancel => {
3238
debug('sub', 'subscribing', subscriber)
3339
subscribers.add(subscriber)
3440
return () => {
@@ -40,8 +46,8 @@ export const publisher = <T>() => {
4046
return {
4147
// Allow iterating over subscribers
4248
[Symbol.iterator]: () => subscribers.values(),
43-
pub,
44-
sub
49+
send,
50+
sink
4551
}
4652
}
4753

typescript/packages/common-frp/src/signal.ts

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { debug } from './shared.js'
22
import {
33
publisher,
4-
Send,
54
Sendable,
65
Cancel,
76
Cancellable,
@@ -65,7 +64,7 @@ const { withUpdates, withReads } = createTransactionManager()
6564
export const __updates__ = Symbol('updates')
6665

6766
export type Updates<T> = {
68-
[__updates__]: (subscriber: Send<T>) => Cancel
67+
[__updates__]: (subscriber: Sendable<T>) => Cancel
6968
}
7069

7170
const isEqual = Object.is
@@ -160,8 +159,10 @@ export const effect: Effect = (
160159

161160
job()
162161

162+
const subscriber = { send: schedule }
163+
163164
return combineCancels(
164-
upstreams.map(signal => signal[__updates__](schedule))
165+
upstreams.map(signal => signal[__updates__](subscriber))
165166
)
166167
}
167168

@@ -177,7 +178,7 @@ export const state = <T>(initial: T) => {
177178
if (!isEqual(state, value)) {
178179
state = value
179180
debug('state', 'value updated', state)
180-
updates.pub()
181+
updates.send()
181182
}
182183
}
183184

@@ -186,7 +187,7 @@ export const state = <T>(initial: T) => {
186187
return {
187188
get,
188189
send,
189-
[__updates__]: updates.sub
190+
[__updates__]: updates.sink
190191
}
191192
}
192193

@@ -280,11 +281,13 @@ export const computed: Computed = (
280281
const performUpdate = () => {
281282
debug('computed', 'mark dirty')
282283
isDirty = true
283-
updates.pub()
284+
updates.send()
284285
}
285286

287+
const subscriber = { send: performUpdate }
288+
286289
const cancel = combineCancels(
287-
upstreams.map(signal => signal[__updates__](performUpdate))
290+
upstreams.map(signal => signal[__updates__](subscriber))
288291
)
289292

290293
const get = () => {
@@ -298,7 +301,7 @@ export const computed: Computed = (
298301

299302
return {
300303
get,
301-
[__updates__]: updates.sub,
304+
[__updates__]: updates.sink,
302305
cancel
303306
}
304307
}

0 commit comments

Comments
 (0)