From 761ae0c5fd4b0e207d5aad83ccc59236c251536d Mon Sep 17 00:00:00 2001 From: Irakli Gozalishvili Date: Mon, 31 Mar 2025 11:53:11 -0700 Subject: [PATCH 1/2] fix: commit subscription --- memory/consumer.ts | 65 +++++++++++++++++++---------- memory/interface.ts | 7 +--- memory/memory.ts | 2 +- memory/provider.ts | 8 ++-- memory/subscription.ts | 79 ++++++++++++++++++++++++------------ memory/test/consumer-test.ts | 65 +++++++++++++++++++++++++++++ 6 files changed, 170 insertions(+), 56 deletions(-) diff --git a/memory/consumer.ts b/memory/consumer.ts index e69770ce9..2c95e1370 100644 --- a/memory/consumer.ts +++ b/memory/consumer.ts @@ -5,13 +5,13 @@ import { Changes, Clock, Command, + Commit, ConnectionError, ConsumerCommandFor, ConsumerCommandInvocation, ConsumerEffectFor, ConsumerInvocationFor, ConsumerResultFor, - ConsumerSession, DID, Entity, InferOf, @@ -46,6 +46,7 @@ import { toStringStream } from "./ucan.ts"; import { fromStringStream } from "./receipt.ts"; import * as Settings from "./settings.ts"; export * from "./interface.ts"; +import { the as commitType } from "./commit.ts"; export { ChangesBuilder }; export const connect = ({ @@ -456,28 +457,50 @@ class QuerySubscriptionInvocation< await unsubscribe; } - override perform(transaction: Transaction) { - const selection = this.selection[this.sub as MemorySpace as Space]; + override perform(commit: Commit) { + const selection = this.selection[this.space]; // Here we will collect subset of changes that match the query. let differential = null; - for (const [of, attributes] of Object.entries(transaction.args.changes)) { - for (const [the, changes] of Object.entries(attributes)) { - const [[cause, change]] = Object.entries(changes); - if (change !== true) { - const state = Object.entries(selection?.[of as Entity]?.[the] ?? {}); - const [current] = state.length > 0 ? state[0] : []; - if (cause !== current) { - for (const pattern of this.patterns) { - const match = (!pattern.of || pattern.of === of) && - (!pattern.the || pattern.the === the) && - (!pattern.cause || pattern.cause === cause); - - if (match) { - differential = differential ?? {}; - ChangesBuilder.set(differential, [of], the, { - [cause]: change, - }); + const at = this.space; + const the = commitType; + const revisions = Object.entries(commit[at][the] ?? {}); + + for (const [cause, change] of revisions) { + const { is: { transaction, since } } = change; + for (const pattern of this.patterns) { + const match = (!pattern.of || pattern.of === at) && + (!pattern.the || pattern.the === the) && + (!pattern.cause || pattern.cause === cause); + + if (match) { + differential = differential ?? {}; + ChangesBuilder.set(differential, [at], the, { + [cause]: change, + }); + } + } + + for (const [of, attributes] of Object.entries(transaction.args.changes)) { + for (const [the, changes] of Object.entries(attributes)) { + const [[cause, change]] = Object.entries(changes); + if (change !== true) { + const state = Object.entries( + selection?.[of as Entity]?.[the] ?? {}, + ); + const [current] = state.length > 0 ? state[0] : []; + if (cause !== current) { + for (const pattern of this.patterns) { + const match = (!pattern.of || pattern.of === of) && + (!pattern.the || pattern.the === the) && + (!pattern.cause || pattern.cause === cause); + + if (match) { + differential = differential ?? {}; + ChangesBuilder.set(differential, [of], the, { + [cause]: change, + }); + } } } } @@ -487,7 +510,7 @@ class QuerySubscriptionInvocation< if (differential) { this.query.integrate(differential); - this.integrate({ [transaction.sub]: differential } as Selection); + this.integrate({ [this.space]: differential } as Selection); } return { ok: {} }; diff --git a/memory/interface.ts b/memory/interface.ts index 66d7d71a9..e2401420e 100644 --- a/memory/interface.ts +++ b/memory/interface.ts @@ -149,7 +149,7 @@ export type Protocol = { source: Subscribe["args"], ): Task< Result, - Transaction + Commit >; unsubscribe( source: Unsubscribe["args"], @@ -231,9 +231,6 @@ export type Method< }; }; -type PC = ProviderCommand; -type CCI = ConsumerCommandInvocation; - export type InferOf = keyof T extends DID ? keyof T : never; /** @@ -460,7 +457,7 @@ export interface MemorySession } export interface Subscriber { - transact(transaction: Transaction): AwaitResult; + commit(commit: Commit): AwaitResult; close(): AwaitResult; } diff --git a/memory/memory.ts b/memory/memory.ts index 645c50e55..e0ad3b7a6 100644 --- a/memory/memory.ts +++ b/memory/memory.ts @@ -157,7 +157,7 @@ export const transact = async (session: Session, transaction: Transaction) => { const promises = []; for (const subscriber of session.subscribers) { - promises.push(subscriber.transact(transaction)); + promises.push(subscriber.commit(result.ok)); } await Promise.all(promises); diff --git a/memory/provider.ts b/memory/provider.ts index 7a2d70b55..2c18e8666 100644 --- a/memory/provider.ts +++ b/memory/provider.ts @@ -3,6 +3,7 @@ import type { AsyncResult, Await, CloseResult, + Commit, ConnectionError, ConsumerCommandInvocation, ConsumerInvocationFor, @@ -88,6 +89,7 @@ class MemoryProvider< fetch(request: Request) { return fetch(this, request); } + session(): ProviderSession { const session = new MemoryProviderSession( this.memory, @@ -251,9 +253,9 @@ class MemoryProviderSession< } } - transact(transaction: Transaction) { + commit(commit: Commit) { for (const [id, channels] of this.channels) { - if (Subscription.match(transaction, channels)) { + if (Subscription.match(commit, channels)) { // Note that we intentionally exit on the first match because we do not // want to send same transaction multiple times to the same consumer. // Consumer does it's own bookkeeping of all the subscriptions and will @@ -261,7 +263,7 @@ class MemoryProviderSession< return this.perform({ the: "task/effect", of: id, - is: transaction, + is: commit, }); } } diff --git a/memory/subscription.ts b/memory/subscription.ts index bbfdd4060..c23e7544a 100644 --- a/memory/subscription.ts +++ b/memory/subscription.ts @@ -1,64 +1,88 @@ import { Cause, + Commit, Entity, MemorySpace, Selector, The, - Transaction, } from "./interface.ts"; +import { the as commitType } from "./commit.ts"; -export const match = (transaction: Transaction, watched: Set) => { - for (const [of, attributes] of Object.entries(transaction.args.changes)) { - for (const [the, changes] of Object.entries(attributes)) { - for (const change of Object.values(changes)) { - // If `change == true` we simply confirm that state has not changed - // so we don't need to notify those subscribers. - if (change !== true) { - const watches = - watched.has(formatAddress(transaction.sub, { the, of })) || - watched.has(formatAddress(transaction.sub, { the })) || - watched.has(formatAddress(transaction.sub, { of })) || - watched.has(formatAddress(transaction.sub, {})); +export const match = (commit: Commit, watched: Set) => { + for (const at of Object.keys(commit) as MemorySpace[]) { + const changes = commit[at][commitType] ?? {}; + for ( + const [cause, { is: { transaction, since } }] of Object.entries(changes) + ) { + // If commit on this space are watched we have a match + if (matchAddress(watched, { the: commitType, of: at, at })) { + return true; + } - if (watches) { - return true; + // Otherwise we consider individual in the commit transaction to figure + // out if we have a match. + for (const [of, attributes] of Object.entries(transaction.args.changes)) { + for (const [the, changes] of Object.entries(attributes)) { + for (const change of Object.values(changes)) { + // If `change == true` we simply confirm that state has not changed + // so we don't need to notify those subscribers. + if ( + change !== true && + matchAddress(watched, { at: transaction.sub, the, of }) + ) { + return true; + } } } } } } + return false; }; +const matchAddress = ( + watched: Set, + { at, the, of }: { the: string; of: string; at: MemorySpace }, +) => + watched.has(formatAddress({ at, the, of })) || + watched.has(formatAddress({ at, the })) || + watched.has(formatAddress({ at, of })) || + watched.has(formatAddress({ at })); + +export const ANY = "_"; + export const channels = function* (space: MemorySpace, selector: Selector) { - const all = [["_", {}]] as const; + const all = [[ANY, {}]] as const; const entities = Object.entries(selector); for (const [of, attributes] of entities.length > 0 ? entities : all) { const selector = Object.entries(attributes); for (const [the] of selector.length > 0 ? selector : all) { - yield formatAddress(space, { the, of }); + yield formatAddress({ at: space, the, of }); } } }; export const fromSelector = function* (selector: Selector) { - const all = [[undefined, {}]] as const; + const all = [[ANY, {}]] as const; const entities = Object.entries(selector); for (const [of, attributes] of entities.length > 0 ? entities : all) { const selector = Object.entries(attributes); for (const [the, members] of selector.length > 0 ? selector : all) { - const selector = Object.entries(members); + const selector = Object.keys(members); for ( - const cause of selector.length > 0 ? Object.keys(selector) : [undefined] + const cause of selector.length > 0 ? selector : [ANY] ) { const selector: { of?: Entity; the?: The; cause?: Cause } = {}; - if (of) { + if (of !== ANY) { selector.of = of as Entity; } - if (the) { + + if (the !== ANY) { selector.the = the as The; } - if (cause) { + + if (cause !== ANY) { selector.cause = cause as Cause; } yield selector; @@ -68,6 +92,9 @@ export const fromSelector = function* (selector: Selector) { }; export const formatAddress = ( - space: MemorySpace, - { of = "_", the = "_" }: { the?: string; of?: string }, -) => `watch:///${space}/${of}/${the}`; + { at = "_", of = "_", the = "_" }: { + at?: string; + the?: string; + of?: string; + }, +) => `watch:///${at}/${of}/${the}`; diff --git a/memory/test/consumer-test.ts b/memory/test/consumer-test.ts index ffa89a071..c128db24a 100644 --- a/memory/test/consumer-test.ts +++ b/memory/test/consumer-test.ts @@ -560,3 +560,68 @@ test("can not transact unauthorized space", store, async (session) => { ), ); }); + +test("subscribe to commits", store, async (session) => { + const clock = new Clock(); + const memory = Consumer.open({ as: alice, session, clock }) + .mount(alice.did()); + + const v1 = Fact.assert({ + the: "application/json", + of: doc, + is: { v: 1 }, + }); + + const r1 = await memory.transact({ + changes: Changes.from([v1]), + }); + assert(r1.ok); + + const v2 = Fact.assert({ + the: "application/json", + of: doc, + is: { v: 2 }, + cause: v1, + }); + + const r2 = await memory.transact({ + changes: Changes.from([v2]), + }); + + assert(r2.ok); + + const query = memory.query({ + select: { + [alice.did()]: { + "application/commit+json": { + _: {}, + }, + }, + }, + }); + + const subscription = query.subscribe(); + const reader = subscription.getReader(); + const p1 = reader.read(); + + const v3 = Fact.assert({ + the: "application/json", + of: doc, + is: { v: 3 }, + cause: v2, + }); + + const r3 = await memory.transact({ + changes: Changes.from([v3]), + }); + assert(r3.ok); + + assertEquals(await p1, { + done: false, + value: { + [alice.did()]: r3.ok, + }, + }); + + reader.cancel(); +}); From 1889b4f210c841fc382fbb290abd162f6e2a8e50 Mon Sep 17 00:00:00 2001 From: Irakli Gozalishvili Date: Mon, 31 Mar 2025 14:12:30 -0700 Subject: [PATCH 2/2] fix: task type inference --- jumble/src/components/View.tsx | 60 ++++++++++++++++++++++++++-------- 1 file changed, 46 insertions(+), 14 deletions(-) diff --git a/jumble/src/components/View.tsx b/jumble/src/components/View.tsx index de4972853..b139dc5fd 100644 --- a/jumble/src/components/View.tsx +++ b/jumble/src/components/View.tsx @@ -17,10 +17,16 @@ export type FX = | Send | Spawn | Perform | Wait>; -export interface Behavior { - init(): Task, Model>; +export type InferCommand> = Effect extends + Send ? Command + : Effect extends Spawn + ? (Inner extends Send ? Command : never) + : never; + +export interface Behavior> { + init(): Task; - update: (model: Model, command: Command) => Task, Model>; + update: (model: Model, command: InferCommand) => Task; } export interface Effect { @@ -31,8 +37,8 @@ export interface Controller { dispatch(command: Command): () => void; } -export interface Subscriber { - (self: Service): View; +export interface Subscriber, View> { + (self: Service): View; } function* test() { @@ -116,16 +122,21 @@ export interface Execution< [Symbol.iterator](): Execution; } -export const service = (behavior: Behavior) => - new Service(behavior); +export const service = >( + behavior: Behavior, +) => new Service(behavior); export type InferSend = Effect extends Send ? Command : never; -class Service { +class Service< + Model, + Effect extends FX, + Command extends InferCommand = InferCommand, +> { state!: Model; - subscribers: Set> | undefined; + subscribers: Set, unknown>> | undefined; inbox: Command[] = []; queue: FX[] = []; @@ -133,7 +144,7 @@ class Service { idle: boolean = true; constructor( - public behavior: Behavior, + public behavior: Behavior, ) { } @@ -184,7 +195,7 @@ class Service { } } - advance(task: Task, Model>) { + advance(task: Task) { const work = task[Symbol.iterator](); while (true) { const step = work.next(); @@ -192,7 +203,7 @@ class Service { this.state = step.value; return this.state; } else { - this.queue.push(step.value); + this.queue.push(step.value as FX); } } } @@ -210,7 +221,7 @@ class Service { terminate() { } - subscribe(subscriber: Subscriber) { + subscribe(subscriber: Subscriber) { if (!this.subscribers) { this.subscribers = new Set([subscriber]); this.advance(this.behavior.init()); @@ -220,7 +231,7 @@ class Service { } } render(view: (state: Model, controller: Controller) => View) { - const [process, advance] = useState<[Service]>(); + const [process, advance] = useState<[Service]>(); const [ui, setUI] = useState(null); useEffect( @@ -255,6 +266,7 @@ class Service { // * ```js // * const Counter = View({ // * init() { +// * yield // * return { state: { count: 0 } }; // * }, // * update({ count }: { count: number }, command: "inc" | "dec") { @@ -296,3 +308,23 @@ class Service { export { Service as Process }; export default Service; + +const initialize = function* () { + const response = yield* wait(fetch("https://help.me")); + + // yield 1; + + yield* send({ hello: "world" } as const); + + yield* send({ test: "hello", body: response } as const); +}; + +export const example = new Service({ + *init() { + yield* spawn(initialize); + return { status: "init" }; + }, + *update(state, message) { + return state; + }, +});