Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
9a5a144
WIP
gordonbrander May 31, 2024
3d58db2
More progress toward cell and stream types
gordonbrander May 31, 2024
3d3f053
Add notes on FRP libs and papers
gordonbrander May 31, 2024
40414f0
More notes
gordonbrander Jun 1, 2024
5d8a679
More notes
gordonbrander Jun 1, 2024
2c92508
More notes
gordonbrander Jun 1, 2024
6880092
Working on read-only signals and computed cells
gordonbrander Jun 2, 2024
1b3db09
Microtask queue-based topological sort
gordonbrander Jun 4, 2024
09689af
More notes
gordonbrander Jun 4, 2024
50a75f3
Simplified impl
gordonbrander Jun 4, 2024
257c227
Remove dead code
gordonbrander Jun 4, 2024
f9c57a3
First pass on transaction-based push-pull FRP
gordonbrander Jun 5, 2024
07fae2c
Update type sig of generateStream
gordonbrander Jun 5, 2024
0d1e2e5
Add typescript overloads for createComputed
gordonbrander Jun 5, 2024
3b0ae76
Better types
gordonbrander Jun 5, 2024
d0541d9
Add separate stream phase to transaction
gordonbrander Jun 5, 2024
32e3799
Clear streams after commit
gordonbrander Jun 5, 2024
897e5f1
Update notes
gordonbrander Jun 6, 2024
7403aec
Add note
gordonbrander Jun 6, 2024
ced4cf5
Add notes
gordonbrander Jun 6, 2024
7574281
Implement signals and streams w separate timelines
gordonbrander Jun 6, 2024
bce9c41
Add pipe and operators
gordonbrander Jun 6, 2024
4df5930
Separate sink/effect from type
gordonbrander Jun 6, 2024
ab6957e
Update notes
gordonbrander Jun 6, 2024
9b7ade8
Add select operator
gordonbrander Jun 6, 2024
ab5531b
Update package
gordonbrander Jun 6, 2024
780fbb6
Update tsconfig
gordonbrander Jun 6, 2024
ad55d54
Fix select operator
gordonbrander Jun 6, 2024
18704d9
Remove index (not used rn)
gordonbrander Jun 6, 2024
9a319a9
Rename publisher
gordonbrander Jun 6, 2024
614e9a8
Merge remote-tracking branch 'origin/main' into 2024-05-30-common-frp
gordonbrander Jun 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
First pass on transaction-based push-pull FRP
- Two transaction phases, updates and reads
- Updates sets state, marks computed dirty, reads recalculates computed
  (using call stack as an implicit topological sorting mechanism)

TODO: write tests
  • Loading branch information
gordonbrander committed Jun 5, 2024
commit f9c57a38ebc257827d71609e6469f48eb2a28cc3
6 changes: 5 additions & 1 deletion sketches/2024-05-30-common-frp/NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@ Libraries:
- [TC39 Signals Proposal](https://github.com/tc39/proposal-signals)
- [SolidJS](https://www.solidjs.com/)
- [Preact Signals](https://preactjs.com/guide/v10/signals/)
- [S.js](https://github.com/adamhaile/S) implements transactions and dynamic graph with single-transaction callback registration, eliminating listener memory leaks.
- [S.js](https://github.com/adamhaile/S)
- implements transactions and dynamic graph with single-transaction callback registration, eliminating listener memory leaks.
- [Arrow.js](https://www.arrow-js.com/docs/)
- Focuses on reactive objects, rather than values
- Key-path style indexing using Proxy
- [Elm Signals 3.0.0](https://github.com/elm-lang/core/blob/3.0.0/src/Native/Signal.js). Deprecated, but the implementation can be found here. 1st order FRP.

### Observables
Expand Down
308 changes: 184 additions & 124 deletions sketches/2024-05-30-common-frp/src/common-frp.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*
TODO: beginnings of a push-pull FRP system using transactions.
This will enable graph cycles.
*/
export const config = {
debug: false
}
Expand All @@ -8,115 +12,159 @@ const debugLog = (tag: string, msg: string) => {
}
}

let _cidCounter = 0
const createTransactionManager = () => {
const updates = new Map<(value: any) => void, any>()
const reads = new Set<() => void>()

/** Create a lifetime-unique client ID, based on incrementing a counter */
const cid = () => `cid${_cidCounter++}`

export const batched = <T>(perform: (value: T) => void) => {
let isScheduled = false
let state: T | undefined = undefined
const schedule = (value: T) => {
state = value
if (!isScheduled) {
isScheduled = true
queueMicrotask(() => {
debugLog('batched', `performing ${state}`)
perform(state!)
isScheduled = false
})

const schedule = () => {
if (isScheduled) {
return
}
debugLog('TransactionManager.schedule', `transaction scheduled`)
isScheduled = true
queueMicrotask(transact)
}
return schedule
}

export type Cancel = () => void

export type Subject<T> = {
send: (value: T) => void
}
const transact = () => {
debugLog('TransactionManager.transact', 'transaction start')
// First perform all cell state changes.
// - Update cell state
// - Mark computed dirty
debugLog('TransactionManager.transact', `transact updates`)
for (const [job, value] of updates) {
job(value)
}
updates.clear()
// Then perform all cell state reads
// - Read cell state
// - Recompute computed cells and mark clean
debugLog('TransactionManager.transact', `transact reads`)
for (const job of reads) {
job()
}
reads.clear()
isScheduled = false
debugLog('TransactionManager.transact', 'transaction end')
}

export type Sink<T> = {
sink: (subscriber: Subject<T>) => Cancel
}
const withUpdates = <T>(job: (value: T) => void, value: T) => {
debugLog('TransactionManager.withUpdates', `queue job with value ${value}`)
updates.set(job, value)
schedule()
}

const pub = <T>(subscribers: Set<Subject<T>>, value: T) => {
debugLog('pub', `dispatching value ${value} to ${subscribers.size} subscribers`)
for (const subscriber of subscribers) {
subscriber.send(value)
const withReads =(job: () => void) => {
debugLog('TransactionManager.withReads', `queue job`)
reads.add(job)
schedule()
}

return {withUpdates, withReads}
}

const sub = <T>(
subscribers: Set<Subject<T>>,
subscriber: Subject<T>
) => {
debugLog('sub', 'subscribing')
subscribers.add(subscriber)
return () => {
debugLog('sub', 'canceling subscription')
subscribers.delete(subscriber)
const {withUpdates, withReads} = createTransactionManager()

export type Unsubscribe = () => void

export type Subscriber<T> = (value: T) => void

/** Low-level pub-sub channel used under the hood by cells and sinks. */
const createPublisher = <T>() => {
const subscribers = new Set<Subscriber<T>>()

const pub = (value: T) => {
debugLog('pub', `dispatching ${value} to ${subscribers.size} subscribers`)
for (const subscriber of subscribers) {
subscriber(value)
}
}

/**
* Subscribe to this publisher
* @param subscriber the function to call when a new value is published
* @returns Unsubscribe function
*/
const sub = (subscriber: Subscriber<T>): Unsubscribe => {
debugLog('sub', 'subscribing')
subscribers.add(subscriber)
return () => {
debugLog('sub', 'canceling subscription')
subscribers.delete(subscriber)
}
}

return {pub, sub}
}

export const combineCancels = (cancels: Array<Cancel>): Cancel => () => {
for (const cancel of cancels) {
/** Combine multiple unsubscribe functions into a single unsubscribe function */
export const combineUnsubscribes = (
unsubscribes: Array<Unsubscribe>
): Unsubscribe => () => {
for (const cancel of unsubscribes) {
cancel()
}
}

export const createStream = <T>() => {
const id = cid()
const downstreams = new Set<Subject<T>>()
/** Symbol for updates subscribe method */
const __updates__: unique symbol = Symbol('updates')

const send = batched((value: T) => {
debugLog(`stream ${id}`, `sending ${value}`)
pub(downstreams, value)
})
export type UpdatesProvider<T> = {
[__updates__]: (subscriber: Subscriber<T>) => Unsubscribe
}

const sink = (subscriber: Subject<T>) => sub(downstreams, subscriber)
export type SinkProvider<T> = {
sink: (subscriber: Subscriber<T>) => Unsubscribe
}

return {send, sink}
export type ReadStream<T> = SinkProvider<T> & UpdatesProvider<T> & {
unsubscribe?: Unsubscribe
}

const noOp = () => {}
export const createStream = <T>() => {
const updates = createPublisher<T>()

const performUpdate = (value: T) => {
debugLog(`stream`, `value: ${value}`)
updates.pub(value)
}

const send = (value: T) => withUpdates(performUpdate, value)

const sink = (subscriber: Subscriber<T>) => updates.sub(
(value: T) => withReads(() => subscriber(value))
)

return {send, [__updates__]: updates.sub, sink}
}

/**
* Generate a stream using a callback.
* Returns a read-only stream.
*/
export const generateStream = <T>(
produce: (send: (value: T) => void) => Cancel|void
) => {
const {send, sink} = createStream<T>()
const cancel = produce(send) ?? noOp
return {cancel, sink}
generate: (send: (value: T) => void) => Unsubscribe|undefined
): ReadStream<T> => {
const {send, [__updates__]: updates, sink} = createStream<T>()
const unsubscribe = generate(send)
return {[__updates__]: updates, sink, unsubscribe}
}

export const mapStream = <T, U>(
upstream: Sink<T>,
stream: ReadStream<T>,
transform: (value: T) => U
) => {
const transformed = createStream<U>()

const cancel = upstream.sink({
send: value => transformed.send(transform(value))
})

return {cancel, sink: transformed.sink}
}
) => generateStream((send) => {
const subscribe = (value: T) => send(transform(value))
return stream[__updates__](subscribe)
})

export const filterStream = <T>(
upstream: Sink<T>,
predicate: (value: T) => boolean
) => generateStream(send => {
return upstream.sink({
send: value => {
if (predicate(value)) {
send(value)
}
export const filterStream = <T, U>(
stream: ReadStream<T>,
predicate: (value: T) => U
) => generateStream((send) => {
const subscribe = (value: T) => {
if (predicate(value)) {
send(value)
}
})
}
return stream[__updates__](subscribe)
})

const isEqual = Object.is
Expand All @@ -127,79 +175,91 @@ export type Gettable<T> = {

export const sample = <T>(container: Gettable<T>) => container.get()

export type CellLike<T> = {
get(): T
[__updates__]: (subscriber: Subscriber<T>) => Unsubscribe
sink: (subscriber: Subscriber<T>) => Unsubscribe
}

export const createCell = <T>(initial: T) => {
const id = cid()
const downstreams = new Set<Subject<T>>()
const updates = createPublisher<void>()

let state = initial

const get = () => state

const send = batched((value: T) => {
// Only notify downstream if state has changed value
const performUpdate = (value: T) => {
// Only perform update if state has actually changed
if (!isEqual(state, value)) {
debugLog(`cell`, `value: ${state}`)
state = value
debugLog(`cell ${id}`, `updated ${state}`)
pub(downstreams, state)
updates.pub()
}
})

const sink = (subscriber: Subject<T>) => {
subscriber.send(state)
return sub(downstreams, subscriber)
}

return {get, send, sink}
}
const send = (value: T) => withUpdates(performUpdate, value)

export type CellLike<T> = {
get: () => T
sink: (subscriber: Subject<T>) => Cancel
const sink = (subscriber: Subscriber<T>) => {
const forward = () => subscriber(get())
return updates.sub(() => withReads(forward))
}

return {
get,
send,
[__updates__]: updates.sub,
sink
}
}

export const createComputed = <T>(
upstreams: Array<CellLike<any>>,
compute: (...values: Array<any>) => T
calc: (...values: Array<any>) => T
) => {
const id = cid()
const downstreams = new Set<Subject<T>>()
const updates = createPublisher<void>()

const recompute = (): T => compute(...upstreams.map(sample))
const recompute = (): T => calc(...upstreams.map(sample))

let isDirty = false
let state = recompute()

const get = () => state
const performUpdate = () => {
debugLog(`computed`, `mark dirty`)
isDirty = true
updates.pub()
}

const unsubscribe = combineUnsubscribes(
upstreams.map(cell => cell[__updates__](performUpdate))
)

const subject = {
send: batched(_ => {
const get = () => {
if (isDirty) {
state = recompute()
debugLog(`computed ${id}`, `recomputed ${state}`)
pub(downstreams, state)
})
debugLog(`computed`, `recomputed state: ${state}`)
isDirty = false
}
return state
}

const cancel = combineCancels(upstreams.map(cell => cell.sink(subject)))

const sink = (
subscriber: Subject<T>
) => {
subscriber.send(state)
return sub(downstreams, subscriber)
const sink = (subscriber: Subscriber<T>) => {
const forward = () => subscriber(get())
return updates.sub(() => withReads(forward))
}

return {get, sink, cancel}
return {
get,
[__updates__]: updates.sub,
sink,
unsubscribe: unsubscribe
}
}

/**
* "Hold" the latest value from a stream in a cell
* @param stream - the stream to update cell
* @param initial - the initial value for the cell
* @returns cell
*/
export const hold = <T>(
stream: Sink<T>,
initial: T
) => {
const cell = createCell(initial)
const cancel = stream.sink(cell)
return {get: cell.get, sink: cell.sink, cancel}
}
export const hold = <T>(stream: ReadStream<T>, initial: T) => {
const {get, [__updates__]: updates, sink, send} = createCell(initial)
const unsubscribe = stream.sink((value: T) => send(value))
return {get, [__updates__]: updates, sink, unsubscribe}
}
Loading