Skip to content

Commit 8259c3c

Browse files
seefeldbclaude
andauthored
feat(runner): Add onCommit callback support for Cell and Stream operations (#1879)
* Add onCommit callback support for Cell and Stream operations This change adds the ability to register callbacks that are invoked when Cell.set(), Cell.send(), or Stream.send() operations commit to storage, regardless of whether the commit succeeds or fails. ## What Changed ### API Changes - Added optional `onCommit` parameter to: - `Cell.set(value, onCommit?)` - `Cell.send(value, onCommit?)` - `Stream.send(event, onCommit?)` - `IScheduler.queueEvent(eventRef, event, retries?, onCommit?)` - Added `IExtendedStorageTransaction.addCommitCallback(callback)` method ### Callback Behavior - **For Cell operations**: Callback is invoked after the transaction commits, whether the commit succeeds or fails. Multiple set() calls on the same transaction can each register callbacks - all are invoked on commit. - **For Stream events**: Callback is invoked when either: - The event handler commits successfully, OR - The event handler fails and all retry attempts are exhausted - Callbacks are NOT invoked during intermediate retry attempts - **Error handling**: Each callback is wrapped in try/catch to prevent one failing callback from breaking others. Errors are logged but don't affect transaction outcome. - **Inspecting results**: Callbacks receive the transaction as a parameter and can call `tx.status()` to determine if the commit succeeded or failed. ### Implementation Details - `ExtendedStorageTransaction` maintains a Set of callbacks and invokes them in its commit().then() handler after the commit completes - `Scheduler.execute()` calls the callback when commit succeeds OR when the final retry fails (retriesLeft === 0) - All callbacks are invoked synchronously within the microtask queue after commit resolution ## Why This Matters This feature enables: - Tracking completion of writes for UI updates or synchronization - Implementing retry logic or error handling at the call site - Coordinating dependent operations across transaction boundaries - Debugging and observability of storage operations The callbacks provide a way to observe transaction outcomes without blocking the synchronous flow of Cell/Stream operations, maintaining the existing non-blocking API while adding observability. ## Testing - Added 7 new tests covering success cases, failure cases, error handling, multiple callbacks, and retry behavior - All existing tests continue to pass (backward compatible) - Total: 79 tests passing 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> * clarified in comment that commit promise isn't rejected on tx failure, but instead an error code is passed. --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent 700e46c commit 8259c3c

File tree

7 files changed

+576
-13
lines changed

7 files changed

+576
-13
lines changed

packages/runner/src/cell.ts

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,14 @@ import { ContextualFlowControl } from "./cfc.ts";
159159
declare module "@commontools/api" {
160160
interface Cell<T> {
161161
get(): Readonly<T>;
162-
set(value: Cellify<T> | T): void;
163-
send(value: Cellify<T> | T): void;
162+
set(
163+
value: Cellify<T> | T,
164+
onCommit?: (tx: IExtendedStorageTransaction) => void,
165+
): void;
166+
send(
167+
value: Cellify<T> | T,
168+
onCommit?: (tx: IExtendedStorageTransaction) => void,
169+
): void;
164170
update<V extends Cellify<Partial<T> | Partial<T>>>(
165171
values: V extends object ? V : never,
166172
): void;
@@ -249,7 +255,7 @@ declare module "@commontools/api" {
249255
}
250256

251257
interface Stream<T> {
252-
send(event: T): void;
258+
send(event: T, onCommit?: (tx: IExtendedStorageTransaction) => void): void;
253259
sink(callback: (event: Readonly<T>) => Cancel | undefined | void): Cancel;
254260
sync(): Promise<Stream<T>> | Stream<T>;
255261
getRaw(options?: IReadOptions): any;
@@ -350,11 +356,11 @@ class StreamCell<T> implements Stream<T> {
350356
return this.link.rootSchema;
351357
}
352358

353-
send(event: T): void {
359+
send(event: T, onCommit?: (tx: IExtendedStorageTransaction) => void): void {
354360
event = convertCellsToLinks(event) as T;
355361

356362
// Use runtime from doc if available
357-
this.runtime.scheduler.queueEvent(this.link, event);
363+
this.runtime.scheduler.queueEvent(this.link, event, undefined, onCommit);
358364

359365
this.cleanup?.();
360366
const [cancel, addCancel] = useCancelGroup();
@@ -432,7 +438,10 @@ export class RegularCell<T> implements Cell<T> {
432438
return validateAndTransform(this.runtime, this.tx, this.link, this.synced);
433439
}
434440

435-
set(newValue: Cellify<T> | T): void {
441+
set(
442+
newValue: Cellify<T> | T,
443+
onCommit?: (tx: IExtendedStorageTransaction) => void,
444+
): void {
436445
if (!this.tx) throw new Error("Transaction required for set");
437446

438447
// No await for the sync, just kicking this off, so we have the data to
@@ -447,10 +456,18 @@ export class RegularCell<T> implements Cell<T> {
447456
newValue,
448457
getTopFrame()?.cause,
449458
);
459+
460+
// Register commit callback if provided
461+
if (onCommit) {
462+
this.tx.addCommitCallback(onCommit);
463+
}
450464
}
451465

452-
send(newValue: Cellify<T> | T): void {
453-
this.set(newValue);
466+
send(
467+
newValue: Cellify<T> | T,
468+
onCommit?: (tx: IExtendedStorageTransaction) => void,
469+
): void {
470+
this.set(newValue, onCommit);
454471
}
455472

456473
update<V extends Cellify<Partial<T> | Partial<T>>>(

packages/runner/src/runtime.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,12 @@ export interface IScheduler {
209209
unsubscribe(action: Action): void;
210210
onConsole(fn: ConsoleHandler): void;
211211
onError(fn: ErrorHandler): void;
212-
queueEvent(eventRef: NormalizedFullLink, event: any): void;
212+
queueEvent(
213+
eventRef: NormalizedFullLink,
214+
event: any,
215+
retries?: number,
216+
onCommit?: (tx: IExtendedStorageTransaction) => void,
217+
): void;
213218
addEventHandler(handler: EventHandler, ref: NormalizedFullLink): Cancel;
214219
runningPromise: Promise<unknown> | undefined;
215220
}

packages/runner/src/scheduler.ts

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,11 @@ const DEFAULT_RETRIES_FOR_EVENTS = 5;
8282
const MAX_RETRIES_FOR_REACTIVE = 10;
8383

8484
export class Scheduler implements IScheduler {
85-
private eventQueue: { action: Action; retriesLeft: number }[] = [];
85+
private eventQueue: {
86+
action: Action;
87+
retriesLeft: number;
88+
onCommit?: (tx: IExtendedStorageTransaction) => void;
89+
}[] = [];
8690
private eventHandlers: [NormalizedFullLink, EventHandler][] = [];
8791

8892
private pending = new Set<Action>();
@@ -313,13 +317,15 @@ export class Scheduler implements IScheduler {
313317
eventLink: NormalizedFullLink,
314318
event: any,
315319
retries: number = DEFAULT_RETRIES_FOR_EVENTS,
320+
onCommit?: (tx: IExtendedStorageTransaction) => void,
316321
): void {
317322
for (const [link, handler] of this.eventHandlers) {
318323
if (areNormalizedLinksSame(link, eventLink)) {
319324
this.queueExecution();
320325
this.eventQueue.push({
321326
action: (tx: IExtendedStorageTransaction) => handler(tx, event),
322327
retriesLeft: retries,
328+
onCommit,
323329
});
324330
}
325331
}
@@ -480,7 +486,7 @@ export class Scheduler implements IScheduler {
480486
// the topological sort in the right way.
481487
const event = this.eventQueue.shift();
482488
if (event) {
483-
const { action, retriesLeft } = event;
489+
const { action, retriesLeft, onCommit } = event;
484490
this.runtime.telemetry.submit({
485491
type: "scheduler.invocation",
486492
handler: action,
@@ -496,10 +502,23 @@ export class Scheduler implements IScheduler {
496502
// enough, especially for a series of event that act on the same
497503
// conflicting data.
498504
if (error && retriesLeft > 0) {
499-
this.eventQueue.unshift({ action, retriesLeft: retriesLeft - 1 });
505+
this.eventQueue.unshift({
506+
action,
507+
retriesLeft: retriesLeft - 1,
508+
onCommit,
509+
});
500510
// Ensure the re-queued event gets processed even if the scheduler
501511
// finished this cycle before the commit completed.
502512
this.queueExecution();
513+
} else if (onCommit) {
514+
// Call commit callback when:
515+
// - Commit succeeds (!error), OR
516+
// - Commit fails but we're out of retries (retriesLeft === 0)
517+
try {
518+
onCommit(tx);
519+
} catch (callbackError) {
520+
logger.error("Error in event commit callback:", callbackError);
521+
}
503522
}
504523
});
505524
}

packages/runner/src/storage/extended-storage-transaction.ts

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ const logResult = (
4242
};
4343

4444
export class ExtendedStorageTransaction implements IExtendedStorageTransaction {
45+
private commitCallbacks = new Set<
46+
(tx: IExtendedStorageTransaction) => void
47+
>();
48+
4549
constructor(public tx: IStorageTransaction) {}
4650

4751
get journal(): ITransactionJournal {
@@ -175,6 +179,38 @@ export class ExtendedStorageTransaction implements IExtendedStorageTransaction {
175179
}
176180

177181
commit(): Promise<Result<Unit, CommitError>> {
178-
return this.tx.commit();
182+
const promise = this.tx.commit();
183+
184+
// Call commit callbacks after commit completes (success or failure) Note
185+
// that promise always resolves, even if the commit fails, in which case it
186+
// passes an error message as result. An exception here would be an internal
187+
// error that should propagate.
188+
promise.then((_result) => {
189+
// Call all callbacks, wrapping each in try/catch to prevent one
190+
// failing callback from breaking others
191+
for (const callback of this.commitCallbacks) {
192+
try {
193+
callback(this);
194+
} catch (error) {
195+
logger.error("Error in commit callback:", error);
196+
}
197+
}
198+
});
199+
200+
return promise;
201+
}
202+
203+
/**
204+
* Add a callback to be called when the transaction commit completes.
205+
* The callback receives the transaction as a parameter and is called
206+
* regardless of whether the commit succeeded or failed.
207+
*
208+
* Note: Callbacks are called synchronously after commit completes.
209+
* If a callback throws, the error is logged but doesn't affect other callbacks.
210+
*
211+
* @param callback - Function to call after commit
212+
*/
213+
addCommitCallback(callback: (tx: IExtendedStorageTransaction) => void): void {
214+
this.commitCallbacks.add(callback);
179215
}
180216
}

packages/runner/src/storage/interface.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -496,6 +496,18 @@ export interface IStorageTransaction {
496496
export interface IExtendedStorageTransaction extends IStorageTransaction {
497497
tx: IStorageTransaction;
498498

499+
/**
500+
* Add a callback to be called when the transaction commit completes.
501+
* The callback receives the transaction as a parameter and is called
502+
* regardless of whether the commit succeeded or failed.
503+
*
504+
* Note: Callbacks are called synchronously after commit completes.
505+
* If a callback throws, the error is logged but doesn't affect other callbacks.
506+
*
507+
* @param callback - Function to call after commit
508+
*/
509+
addCommitCallback(callback: (tx: IExtendedStorageTransaction) => void): void;
510+
499511
/**
500512
* Reads a value from a (local) memory address and throws on error, except for
501513
* `NotFoundError` which is returned as undefined.

0 commit comments

Comments
 (0)