-
Notifications
You must be signed in to change notification settings - Fork 9
feat(runner): Add onCommit callback support for Cell and Stream operations #1879
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(runner): Add onCommit callback support for Cell and Stream operations #1879
Conversation
This change adds the ability to register callbacks that are invoked when Cell.set(), Cell.send(), or Stream.send() operations commit to storage, regardless of whether the commit succeeds or fails. ## What Changed ### API Changes - Added optional `onCommit` parameter to: - `Cell.set(value, onCommit?)` - `Cell.send(value, onCommit?)` - `Stream.send(event, onCommit?)` - `IScheduler.queueEvent(eventRef, event, retries?, onCommit?)` - Added `IExtendedStorageTransaction.addCommitCallback(callback)` method ### Callback Behavior - **For Cell operations**: Callback is invoked after the transaction commits, whether the commit succeeds or fails. Multiple set() calls on the same transaction can each register callbacks - all are invoked on commit. - **For Stream events**: Callback is invoked when either: - The event handler commits successfully, OR - The event handler fails and all retry attempts are exhausted - Callbacks are NOT invoked during intermediate retry attempts - **Error handling**: Each callback is wrapped in try/catch to prevent one failing callback from breaking others. Errors are logged but don't affect transaction outcome. - **Inspecting results**: Callbacks receive the transaction as a parameter and can call `tx.status()` to determine if the commit succeeded or failed. ### Implementation Details - `ExtendedStorageTransaction` maintains a Set of callbacks and invokes them in its commit().then() handler after the commit completes - `Scheduler.execute()` calls the callback when commit succeeds OR when the final retry fails (retriesLeft === 0) - All callbacks are invoked synchronously within the microtask queue after commit resolution ## Why This Matters This feature enables: - Tracking completion of writes for UI updates or synchronization - Implementing retry logic or error handling at the call site - Coordinating dependent operations across transaction boundaries - Debugging and observability of storage operations The callbacks provide a way to observe transaction outcomes without blocking the synchronous flow of Cell/Stream operations, maintaining the existing non-blocking API while adding observability. ## Testing - Added 7 new tests covering success cases, failure cases, error handling, multiple callbacks, and retry behavior - All existing tests continue to pass (backward compatible) - Total: 79 tests passing 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No issues found across 7 files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds optional onCommit callback support to Cell and Stream operations, enabling observation of transaction completion without blocking synchronous operations. The callbacks are invoked after storage commits, regardless of success or failure.
- Added onCommit parameter to Cell.set(), Cell.send(), Stream.send(), and Scheduler.queueEvent()
- Implemented commit callback infrastructure in ExtendedStorageTransaction
- Added comprehensive test coverage for success/failure scenarios and error handling
Reviewed Changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| packages/runner/src/storage/interface.ts | Added addCommitCallback method to IExtendedStorageTransaction interface |
| packages/runner/src/storage/extended-storage-transaction.ts | Implemented commit callback storage and execution logic |
| packages/runner/src/runtime.ts | Updated IScheduler.queueEvent signature to include onCommit parameter |
| packages/runner/src/scheduler.ts | Added onCommit callback handling to event queue and execution logic |
| packages/runner/src/cell.ts | Added onCommit parameter to Cell.set(), Cell.send(), and Stream.send() methods |
| packages/runner/test/cell.test.ts | Added comprehensive tests for Cell commit callbacks |
| packages/runner/test/scheduler.test.ts | Added comprehensive tests for Stream event commit callbacks |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
…, but instead an error code is passed.
| // that promise always resolves, even if the commit fails, in which case it | ||
| // passes an error message as result. An exception here would be an internal | ||
| // error that should propagate. | ||
| promise.then((_result) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we capture commitCallbacks first?
I think as-is, if we add a commitCallback after this call of commit, it will be invoked, and if we remove one, it won't be invoked, which might be unexpected.
We can also avoid the .then if there are no callbacks in that case.
| // - Commit succeeds (!error), OR | ||
| // - Commit fails but we're out of retries (retriesLeft === 0) | ||
| try { | ||
| onCommit(tx); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think it makes sense to pass error to the onCommit function?
onCommit could have an optional additional parameter for it.
However, that may be exposing more runtime information to patterns than we want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think patterns will be allowed to 'see' this signature anyway? We could have ANOTHER optional callback for errors but that's starting to feel bloated to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok to land as is, though I think it would be better to capture commitCallbacks.
This onCommit idea looks really useful.
This change adds the ability to register callbacks that are invoked when Cell.set(), Cell.send(), or Stream.send() operations commit to storage, regardless of whether the commit succeeds or fails.
What Changed
API Changes
Added optional
onCommitparameter to:Cell.set(value, onCommit?)Cell.send(value, onCommit?)Stream.send(event, onCommit?)IScheduler.queueEvent(eventRef, event, retries?, onCommit?)Added
IExtendedStorageTransaction.addCommitCallback(callback)methodCallback Behavior
For Cell operations: Callback is invoked after the transaction commits, whether the commit succeeds or fails. Multiple set() calls on the same transaction can each register callbacks - all are invoked on commit.
For Stream events: Callback is invoked when either:
Error handling: Each callback is wrapped in try/catch to prevent one failing callback from breaking others. Errors are logged but don't affect transaction outcome.
Inspecting results: Callbacks receive the transaction as a parameter and can call
tx.status()to determine if the commit succeeded or failed.Implementation Details
ExtendedStorageTransactionmaintains a Set of callbacks and invokes them in its commit().then() handler after the commit completesScheduler.execute()calls the callback when commit succeeds OR when the final retry fails (retriesLeft === 0)Why This Matters
This feature enables:
The callbacks provide a way to observe transaction outcomes without blocking the synchronous flow of Cell/Stream operations, maintaining the existing non-blocking API while adding observability.
Testing
🤖 Generated with Claude Code
Summary by cubic
Adds optional onCommit callbacks to Cell.set/send, Stream.send, and scheduler.queueEvent to fire after storage commits. This improves observability without blocking writes.