Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 25 additions & 8 deletions packages/runner/src/cell.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,14 @@ import { ContextualFlowControl } from "./cfc.ts";
declare module "@commontools/api" {
interface Cell<T> {
get(): Readonly<T>;
set(value: Cellify<T> | T): void;
send(value: Cellify<T> | T): void;
set(
value: Cellify<T> | T,
onCommit?: (tx: IExtendedStorageTransaction) => void,
): void;
send(
value: Cellify<T> | T,
onCommit?: (tx: IExtendedStorageTransaction) => void,
): void;
update<V extends Cellify<Partial<T> | Partial<T>>>(
values: V extends object ? V : never,
): void;
Expand Down Expand Up @@ -249,7 +255,7 @@ declare module "@commontools/api" {
}

interface Stream<T> {
send(event: T): void;
send(event: T, onCommit?: (tx: IExtendedStorageTransaction) => void): void;
sink(callback: (event: Readonly<T>) => Cancel | undefined | void): Cancel;
sync(): Promise<Stream<T>> | Stream<T>;
getRaw(options?: IReadOptions): any;
Expand Down Expand Up @@ -350,11 +356,11 @@ class StreamCell<T> implements Stream<T> {
return this.link.rootSchema;
}

send(event: T): void {
send(event: T, onCommit?: (tx: IExtendedStorageTransaction) => void): void {
event = convertCellsToLinks(event) as T;

// Use runtime from doc if available
this.runtime.scheduler.queueEvent(this.link, event);
this.runtime.scheduler.queueEvent(this.link, event, undefined, onCommit);

this.cleanup?.();
const [cancel, addCancel] = useCancelGroup();
Expand Down Expand Up @@ -432,7 +438,10 @@ export class RegularCell<T> implements Cell<T> {
return validateAndTransform(this.runtime, this.tx, this.link, this.synced);
}

set(newValue: Cellify<T> | T): void {
set(
newValue: Cellify<T> | T,
onCommit?: (tx: IExtendedStorageTransaction) => void,
): void {
if (!this.tx) throw new Error("Transaction required for set");

// No await for the sync, just kicking this off, so we have the data to
Expand All @@ -447,10 +456,18 @@ export class RegularCell<T> implements Cell<T> {
newValue,
getTopFrame()?.cause,
);

// Register commit callback if provided
if (onCommit) {
this.tx.addCommitCallback(onCommit);
}
}

send(newValue: Cellify<T> | T): void {
this.set(newValue);
send(
newValue: Cellify<T> | T,
onCommit?: (tx: IExtendedStorageTransaction) => void,
): void {
this.set(newValue, onCommit);
}

update<V extends Cellify<Partial<T> | Partial<T>>>(
Expand Down
7 changes: 6 additions & 1 deletion packages/runner/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,12 @@ export interface IScheduler {
unsubscribe(action: Action): void;
onConsole(fn: ConsoleHandler): void;
onError(fn: ErrorHandler): void;
queueEvent(eventRef: NormalizedFullLink, event: any): void;
queueEvent(
eventRef: NormalizedFullLink,
event: any,
retries?: number,
onCommit?: (tx: IExtendedStorageTransaction) => void,
): void;
addEventHandler(handler: EventHandler, ref: NormalizedFullLink): Cancel;
runningPromise: Promise<unknown> | undefined;
}
Expand Down
25 changes: 22 additions & 3 deletions packages/runner/src/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@ const DEFAULT_RETRIES_FOR_EVENTS = 5;
const MAX_RETRIES_FOR_REACTIVE = 10;

export class Scheduler implements IScheduler {
private eventQueue: { action: Action; retriesLeft: number }[] = [];
private eventQueue: {
action: Action;
retriesLeft: number;
onCommit?: (tx: IExtendedStorageTransaction) => void;
}[] = [];
private eventHandlers: [NormalizedFullLink, EventHandler][] = [];

private pending = new Set<Action>();
Expand Down Expand Up @@ -313,13 +317,15 @@ export class Scheduler implements IScheduler {
eventLink: NormalizedFullLink,
event: any,
retries: number = DEFAULT_RETRIES_FOR_EVENTS,
onCommit?: (tx: IExtendedStorageTransaction) => void,
): void {
for (const [link, handler] of this.eventHandlers) {
if (areNormalizedLinksSame(link, eventLink)) {
this.queueExecution();
this.eventQueue.push({
action: (tx: IExtendedStorageTransaction) => handler(tx, event),
retriesLeft: retries,
onCommit,
});
}
}
Expand Down Expand Up @@ -480,7 +486,7 @@ export class Scheduler implements IScheduler {
// the topological sort in the right way.
const event = this.eventQueue.shift();
if (event) {
const { action, retriesLeft } = event;
const { action, retriesLeft, onCommit } = event;
this.runtime.telemetry.submit({
type: "scheduler.invocation",
handler: action,
Expand All @@ -496,10 +502,23 @@ export class Scheduler implements IScheduler {
// enough, especially for a series of event that act on the same
// conflicting data.
if (error && retriesLeft > 0) {
this.eventQueue.unshift({ action, retriesLeft: retriesLeft - 1 });
this.eventQueue.unshift({
action,
retriesLeft: retriesLeft - 1,
onCommit,
});
// Ensure the re-queued event gets processed even if the scheduler
// finished this cycle before the commit completed.
this.queueExecution();
} else if (onCommit) {
// Call commit callback when:
// - Commit succeeds (!error), OR
// - Commit fails but we're out of retries (retriesLeft === 0)
try {
onCommit(tx);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think it makes sense to pass error to the onCommit function?
onCommit could have an optional additional parameter for it.
However, that may be exposing more runtime information to patterns than we want.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think patterns will be allowed to 'see' this signature anyway? We could have ANOTHER optional callback for errors but that's starting to feel bloated to me.

} catch (callbackError) {
logger.error("Error in event commit callback:", callbackError);
}
}
});
}
Expand Down
38 changes: 37 additions & 1 deletion packages/runner/src/storage/extended-storage-transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ const logResult = (
};

export class ExtendedStorageTransaction implements IExtendedStorageTransaction {
private commitCallbacks = new Set<
(tx: IExtendedStorageTransaction) => void
>();

constructor(public tx: IStorageTransaction) {}

get journal(): ITransactionJournal {
Expand Down Expand Up @@ -175,6 +179,38 @@ export class ExtendedStorageTransaction implements IExtendedStorageTransaction {
}

commit(): Promise<Result<Unit, CommitError>> {
return this.tx.commit();
const promise = this.tx.commit();

// Call commit callbacks after commit completes (success or failure) Note
// that promise always resolves, even if the commit fails, in which case it
// passes an error message as result. An exception here would be an internal
// error that should propagate.
promise.then((_result) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we capture commitCallbacks first?
I think as-is, if we add a commitCallback after this call of commit, it will be invoked, and if we remove one, it won't be invoked, which might be unexpected.
We can also avoid the .then if there are no callbacks in that case.

// Call all callbacks, wrapping each in try/catch to prevent one
// failing callback from breaking others
for (const callback of this.commitCallbacks) {
try {
callback(this);
} catch (error) {
logger.error("Error in commit callback:", error);
}
}
});

return promise;
}

/**
* Add a callback to be called when the transaction commit completes.
* The callback receives the transaction as a parameter and is called
* regardless of whether the commit succeeded or failed.
*
* Note: Callbacks are called synchronously after commit completes.
* If a callback throws, the error is logged but doesn't affect other callbacks.
*
* @param callback - Function to call after commit
*/
addCommitCallback(callback: (tx: IExtendedStorageTransaction) => void): void {
this.commitCallbacks.add(callback);
}
}
12 changes: 12 additions & 0 deletions packages/runner/src/storage/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,18 @@ export interface IStorageTransaction {
export interface IExtendedStorageTransaction extends IStorageTransaction {
tx: IStorageTransaction;

/**
* Add a callback to be called when the transaction commit completes.
* The callback receives the transaction as a parameter and is called
* regardless of whether the commit succeeded or failed.
*
* Note: Callbacks are called synchronously after commit completes.
* If a callback throws, the error is logged but doesn't affect other callbacks.
*
* @param callback - Function to call after commit
*/
addCommitCallback(callback: (tx: IExtendedStorageTransaction) => void): void;

/**
* Reads a value from a (local) memory address and throws on error, except for
* `NotFoundError` which is returned as undefined.
Expand Down
Loading