From 6314a3baf2c1f60e57cd4f3e8e8f3ecb40088a51 Mon Sep 17 00:00:00 2001 From: Irakli Gozalishvili Date: Thu, 17 Jul 2025 13:06:02 -0700 Subject: [PATCH 1/5] Update StorageTransactionStatus to include error state --- packages/runner/src/storage/interface.ts | 10 +++----- packages/runner/src/storage/transaction.ts | 8 +++++-- packages/runner/test/transaction.test.ts | 28 +++++++++++----------- 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/packages/runner/src/storage/interface.ts b/packages/runner/src/storage/interface.ts index 4c116f19f..99051a548 100644 --- a/packages/runner/src/storage/interface.ts +++ b/packages/runner/src/storage/interface.ts @@ -376,15 +376,11 @@ export type IStorageTransactionProgress = Variant<{ pending: IStorageTransactionLog; done: IStorageTransactionLog; }>; -export type StorageTransactionStatus = Result< - IStorageTransactionState, - StorageTransactionFailed ->; - -export type IStorageTransactionState = +export type StorageTransactionStatus = | { status: "ready"; journal: ITransactionJournal } | { status: "pending"; journal: ITransactionJournal } - | { status: "done"; journal: ITransactionJournal }; + | { status: "done"; journal: ITransactionJournal } + | { status: "error"; journal: ITransactionJournal; error: StorageTransactionFailed }; /** * Representation of a storage transaction, which can be used to query facts and diff --git a/packages/runner/src/storage/transaction.ts b/packages/runner/src/storage/transaction.ts index f615c5d91..47cb8e09b 100644 --- a/packages/runner/src/storage/transaction.ts +++ b/packages/runner/src/storage/transaction.ts @@ -112,9 +112,13 @@ export const status = ( ): StorageTransactionStatus => { const state = use(transaction); if (state.status === "done") { - return state.result.error ? state.result : { ok: state }; + if (state.result.error) { + return { status: "error", journal: state.journal, error: state.result.error }; + } else { + return { status: "done", journal: state.journal }; + } } else { - return { ok: state }; + return { status: state.status, journal: state.journal }; } }; diff --git a/packages/runner/test/transaction.test.ts b/packages/runner/test/transaction.test.ts index 3ff9c928e..f41005944 100644 --- a/packages/runner/test/transaction.test.ts +++ b/packages/runner/test/transaction.test.ts @@ -26,8 +26,7 @@ describe("StorageTransaction", () => { describe("Basic Lifecycle", () => { it("should start with ready status", () => { const result = transaction.status(); - expect(result.ok).toBeDefined(); - expect(result.ok?.status).toBe("ready"); + expect(result.status).toBe("ready"); }); it("should create reader for a space", () => { @@ -323,10 +322,12 @@ describe("StorageTransaction", () => { expect(result.ok).toBeDefined(); const status = transaction.status(); - expect(status.error).toBeDefined(); - expect(status.error?.name).toBe("StorageTransactionAborted"); - if (status.error?.name === "StorageTransactionAborted") { - expect(status.error.reason).toBe(reason); + expect(status.status).toBe("error"); + if (status.status === "error") { + expect(status.error.name).toBe("StorageTransactionAborted"); + if (status.error.name === "StorageTransactionAborted") { + expect(status.error.reason).toBe(reason); + } } }); @@ -374,8 +375,7 @@ describe("StorageTransaction", () => { expect(result.ok).toBeDefined(); const status = transaction.status(); - expect(status.ok).toBeDefined(); - expect(status.ok?.status).toBe("done"); + expect(status.status).toBe("done"); }); it("should commit transaction with changes", async () => { @@ -418,15 +418,13 @@ describe("StorageTransaction", () => { // Check status while committing const pendingStatus = transaction.status(); - expect(pendingStatus.ok).toBeDefined(); - expect(pendingStatus.ok?.status).toBe("pending"); + expect(pendingStatus.status).toBe("pending"); await commitPromise; // Check status after commit const doneStatus = transaction.status(); - expect(doneStatus.ok).toBeDefined(); - expect(doneStatus.ok?.status).toBe("done"); + expect(doneStatus.status).toBe("done"); }); it("should fail operations after commit", async () => { @@ -515,8 +513,10 @@ describe("StorageTransaction", () => { // Verify transaction status shows failure const status = freshTransaction.status(); - expect(status.error).toBeDefined(); - expect(status.error?.name).toBe("StorageTransactionInconsistent"); + expect(status.status).toBe("error"); + if (status.status === "error") { + expect(status.error.name).toBe("StorageTransactionInconsistent"); + } }); }); From 30efcb3c9dc4751645211317348ff6322657d5aa Mon Sep 17 00:00:00 2001 From: Irakli Gozalishvili Date: Fri, 18 Jul 2025 08:29:28 -0700 Subject: [PATCH 2/5] refactor transaction status removing incopatibility with ship --- .../runner/src/builtins/compile-and-run.ts | 9 ++-- packages/runner/src/builtins/fetch-data.ts | 6 ++- packages/runner/src/builtins/llm.ts | 12 ++++-- packages/runner/src/builtins/stream-data.ts | 3 +- packages/runner/src/query-result-proxy.ts | 3 +- packages/runner/src/runner.ts | 2 +- packages/runner/src/schema.ts | 2 +- packages/runner/src/storage/interface.ts | 42 +++++++------------ .../runner/src/storage/transaction-shim.ts | 42 +++++++++++++++---- .../test/storage-transaction-shim.test.ts | 5 +-- 10 files changed, 73 insertions(+), 53 deletions(-) diff --git a/packages/runner/src/builtins/compile-and-run.ts b/packages/runner/src/builtins/compile-and-run.ts index c108fcf84..8dd2b349f 100644 --- a/packages/runner/src/builtins/compile-and-run.ts +++ b/packages/runner/src/builtins/compile-and-run.ts @@ -164,7 +164,8 @@ export function compileAndRun( // All this code runside outside the original action, and the // transaction above might have closed by the time this is called. If // so, we create a new one to set the error. - const asyncTx = tx.status().ok?.open ? tx : runtime.edit(); + const status = tx.status(); + const asyncTx = status.status === "ready" ? tx : runtime.edit(); // Extract structured errors if this is a CompilerError if (err instanceof CompilerError) { @@ -189,7 +190,8 @@ export function compileAndRun( // All this code runside outside the original action, and the // transaction above might have closed by the time this is called. If // so, we create a new one to set the status. - const asyncTx = tx.status().ok?.open ? tx : runtime.edit(); + const status = tx.status(); + const asyncTx = status.status === "ready" ? tx : runtime.edit(); pendingWithLog.withTx(asyncTx).set(false); if (asyncTx !== tx) asyncTx.commit(); }); @@ -204,7 +206,8 @@ export function compileAndRun( // All this code runside outside the original action, and the // transaction above might have closed by the time this is called. If // so, we create a new one to start the charm. - const asyncTx = tx.status().ok?.open ? tx : runtime.edit(); + const status = tx.status(); + const asyncTx = status.status === "ready" ? tx : runtime.edit(); await runtime.runSynced(result.withTx(asyncTx), recipe, input.get()); if (asyncTx !== tx) asyncTx.commit(); } diff --git a/packages/runner/src/builtins/fetch-data.ts b/packages/runner/src/builtins/fetch-data.ts index 3e37468fb..493736381 100644 --- a/packages/runner/src/builtins/fetch-data.ts +++ b/packages/runner/src/builtins/fetch-data.ts @@ -129,7 +129,8 @@ export function fetchData( // All this code runside outside the original action, and the // transaction above might have closed by the time this is called. If // so, we create a new one to set the result. - const asyncTx = tx.status().ok?.open ? tx : runtime.edit(); + const status = tx.status(); + const asyncTx = status.status === "ready" ? tx : runtime.edit(); pendingWithLog.withTx(asyncTx).set(false); resultWithLog.withTx(asyncTx).set(data); @@ -145,7 +146,8 @@ export function fetchData( // All this code runside outside the original action, and the // transaction above might have closed by the time this is called. If // so, we create a new one to set the error. - const asyncTx = tx.status().ok?.open ? tx : runtime.edit(); + const status = tx.status(); + const asyncTx = status.status === "ready" ? tx : runtime.edit(); pendingWithLog.withTx(asyncTx).set(false); errorWithLog.withTx(asyncTx).set(err); diff --git a/packages/runner/src/builtins/llm.ts b/packages/runner/src/builtins/llm.ts index 104e46560..93c48bf3a 100644 --- a/packages/runner/src/builtins/llm.ts +++ b/packages/runner/src/builtins/llm.ts @@ -158,7 +158,8 @@ export function llm( // All this code runside outside the original action, and the // transaction above might have closed by the time this is called. If // so, we create a new one to set the result. - const asyncTx = tx.status().ok?.open ? tx : runtime.edit(); + const status = tx.status(); + const asyncTx = status.status === "ready" ? tx : runtime.edit(); pendingWithLog.withTx(asyncTx).set(false); resultWithLog.withTx(asyncTx).set(text); @@ -177,7 +178,8 @@ export function llm( // All this code runside outside the original action, and the // transaction above might have closed by the time this is called. If // so, we create a new one to set the result. - const asyncTx = tx.status().ok?.open ? tx : runtime.edit(); + const status = tx.status(); + const asyncTx = status.status === "ready" ? tx : runtime.edit(); pendingWithLog.withTx(asyncTx).set(false); resultWithLog.withTx(asyncTx).set(undefined); @@ -330,7 +332,8 @@ export function generateObject>( // All this code runside outside the original action, and the // transaction above might have closed by the time this is called. If // so, we create a new one to set the result. - const asyncTx = tx.status().ok?.open ? tx : runtime.edit(); + const status = tx.status(); + const asyncTx = status.status === "ready" ? tx : runtime.edit(); pendingWithLog.withTx(asyncTx).set(false); resultWithLog.withTx(asyncTx).set(response.object); @@ -348,7 +351,8 @@ export function generateObject>( // All this code runside outside the original action, and the // transaction above might have closed by the time this is called. If // so, we create a new one to set the result. - const asyncTx = tx.status().ok?.open ? tx : runtime.edit(); + const status = tx.status(); + const asyncTx = status.status === "ready" ? tx : runtime.edit(); pendingWithLog.withTx(asyncTx).set(false); resultWithLog.withTx(asyncTx).set({} as any); // FIXME(ja): setting result to undefined causes a storage conflict diff --git a/packages/runner/src/builtins/stream-data.ts b/packages/runner/src/builtins/stream-data.ts index 445a5bedb..540369c6b 100644 --- a/packages/runner/src/builtins/stream-data.ts +++ b/packages/runner/src/builtins/stream-data.ts @@ -181,7 +181,8 @@ export function streamData( await runtime.idle(); - const asyncTx = tx.status().ok?.open ? tx : runtime.edit(); + const status = tx.status(); + const asyncTx = status.status === "ready" ? tx : runtime.edit(); pendingWithLog.withTx(asyncTx).set(false); resultWithLog.withTx(asyncTx).set(undefined); errorWithLog.withTx(asyncTx).set(e); diff --git a/packages/runner/src/query-result-proxy.ts b/packages/runner/src/query-result-proxy.ts index 8a464d5a7..da48db81b 100644 --- a/packages/runner/src/query-result-proxy.ts +++ b/packages/runner/src/query-result-proxy.ts @@ -77,7 +77,8 @@ export function createQueryResultProxy( } // Resolve path and follow links to actual value. - const readTx = tx?.status().ok?.open ? tx : runtime.edit(); + const txStatus = tx?.status(); + const readTx = (txStatus?.status === "ready" && tx) ? tx : runtime.edit(); link = resolveLinkToValue(readTx, link); const target = readTx.readValueOrThrow(link) as any; diff --git a/packages/runner/src/runner.ts b/packages/runner/src/runner.ts index 50e93511a..ba77de7be 100644 --- a/packages/runner/src/runner.ts +++ b/packages/runner/src/runner.ts @@ -287,7 +287,7 @@ export class Runner implements IRunner { // scheduler if the transaction isn't committed before the first functions // run. Though most likely the worst case is just extra invocations. this.run( - resultCell.tx?.status().ok?.open ? resultCell.tx : undefined, + resultCell.tx?.status().status === "ready" ? resultCell.tx : undefined, recipe, inputs, resultCell, diff --git a/packages/runner/src/schema.ts b/packages/runner/src/schema.ts index d43e7560a..24d273830 100644 --- a/packages/runner/src/schema.ts +++ b/packages/runner/src/schema.ts @@ -270,7 +270,7 @@ export function validateAndTransform( // from the tx. Once tx.commit() is called, all that data is either available // via other transactions or has been rolled back. Either way, we want to // reflect that reality. - if (!tx?.status().ok?.open) tx = undefined; + if (tx?.status().status !== "ready") tx = undefined; // Reconstruct doc, path, schema, rootSchema from link and runtime const schema = link.schema; diff --git a/packages/runner/src/storage/interface.ts b/packages/runner/src/storage/interface.ts index 99051a548..cf1d289bc 100644 --- a/packages/runner/src/storage/interface.ts +++ b/packages/runner/src/storage/interface.ts @@ -380,7 +380,11 @@ export type StorageTransactionStatus = | { status: "ready"; journal: ITransactionJournal } | { status: "pending"; journal: ITransactionJournal } | { status: "done"; journal: ITransactionJournal } - | { status: "error"; journal: ITransactionJournal; error: StorageTransactionFailed }; + | { + status: "error"; + journal: ITransactionJournal; + error: StorageTransactionFailed; + }; /** * Representation of a storage transaction, which can be used to query facts and @@ -396,19 +400,16 @@ export type StorageTransactionStatus = */ export interface IStorageTransaction { /** - * Describes current status of the transaction. If transaction has failed - * or was cancelled result will be an error with a corresponding error variant. - * If transaction is being built it will have `open` status, if commit was - * called but promise has not resolved yet it will be `pending`. If commit - * successfully completed it will be `done`. - * - * Please note that if storage was updated since transaction was created such - * that any of the invariants have changed status will be change to - * `IStorageConsistencyError` even though transaction has not being commited. - * This allows transactor to cancel and recreate transaction with a current - * state without having to build up a whole transaction and commiting it. + * Describes current status of the transaction. Returns a union type with + * status field indicating the current state: + * - `"ready"`: Transaction is being built and ready for operations + * - `"pending"`: Commit was called but promise has not resolved yet + * - `"done"`: Commit successfully completed + * - `"error"`: Transaction has failed or was cancelled, includes error details + + * Each status variant includes a `journal` field with transaction operations. */ - // status(): StorageTransactionStatus; + status(): StorageTransactionStatus; /** * Helper that is the same as `reader().read()` but more convenient, as it @@ -491,21 +492,6 @@ export interface IStorageTransaction { } export interface IExtendedStorageTransaction extends IStorageTransaction { - /** - * Describes current status of the transaction. If transaction has failed - * or was cancelled result will be an error with a corresponding error variant. - * If transaction is being built it will have `open` status, if commit was - * called but promise has not resolved yet it will be `pending`. If commit - * successfully completed it will be `done`. - * - * Please note that if storage was updated since transaction was created such - * that any of the invariants have changed status will be change to - * `IStorageConsistencyError` even though transaction has not being commited. - * This allows transactor to cancel and recreate transaction with a current - * state without having to build up a whole transaction and commiting it. - */ - status(): Result; - /** * Reads a value from a (local) memory address and throws on error, except for * `NotFoundError` which is returned as undefined. diff --git a/packages/runner/src/storage/transaction-shim.ts b/packages/runner/src/storage/transaction-shim.ts index 399859825..39f0fce01 100644 --- a/packages/runner/src/storage/transaction-shim.ts +++ b/packages/runner/src/storage/transaction-shim.ts @@ -14,6 +14,7 @@ import type { IStorageTransactionInvariant, IStorageTransactionLog, IStorageTransactionProgress, + ITransactionJournal, ITransactionReader, ITransactionWriter, IUnsupportedMediaTypeError, @@ -25,6 +26,7 @@ import type { ReadError, Result, StorageTransactionFailed, + StorageTransactionStatus, Unit, Write, WriteError, @@ -400,6 +402,35 @@ export class StorageTransaction implements IStorageTransaction { constructor(private runtime: IRuntime) {} + status(): StorageTransactionStatus { + // Create a minimal ITransactionJournal adapter for this.txLog + const notImplementedError: InactiveTransactionError = { + name: "StorageTransactionCompleteError", + message: "Not implemented", + }; + + const journal: ITransactionJournal = { + activity: () => [], + novelty: () => [], + history: () => [], + reader: () => ({ error: notImplementedError }), + writer: () => ({ error: notImplementedError }), + close: () => ({ error: notImplementedError }), + abort: () => ({ error: notImplementedError }), + }; + + if (this.currentStatus.open) { + return { status: "ready", journal }; + } else if (this.currentStatus.pending) { + return { status: "pending", journal }; + } else if (this.currentStatus.done) { + return { status: "done", journal }; + } else { + // This should not happen in normal flow, but return ready as fallback + return { status: "ready", journal }; + } + } + reader(space: MemorySpace): Result { if (this.currentStatus.open === undefined) { const error = new Error( @@ -518,15 +549,8 @@ export class StorageTransaction implements IStorageTransaction { export class ExtendedStorageTransaction implements IExtendedStorageTransaction { constructor(private tx: IStorageTransaction) {} - status(): Result { - // If the underlying transaction has status, use it; otherwise, return a default - // This assumes the underlying transaction is a StorageTransaction from this file - // and has currentStatus and txLog, otherwise this will need to be adapted - if (typeof (this.tx as any).currentStatus !== "undefined") { - return { ok: (this.tx as any).currentStatus }; - } - // Fallback: not available - return { ok: { open: (this.tx as any).txLog } }; + status(): StorageTransactionStatus { + return this.tx.status(); } log(): IStorageTransactionLog { diff --git a/packages/runner/test/storage-transaction-shim.test.ts b/packages/runner/test/storage-transaction-shim.test.ts index df2ca00c4..2a37bdf72 100644 --- a/packages/runner/test/storage-transaction-shim.test.ts +++ b/packages/runner/test/storage-transaction-shim.test.ts @@ -32,8 +32,7 @@ describe("StorageTransaction", () => { // Check initial status const statusResult = transaction.status(); - expect(statusResult.ok).toBeDefined(); - expect(statusResult.ok?.open).toBeDefined(); + expect(statusResult.status).toBe("ready"); // First write to root path to create a record const rootWriteResult = transaction.write({ @@ -110,7 +109,7 @@ describe("StorageTransaction", () => { // Check final status const finalStatusResult = transaction.status(); - expect(finalStatusResult.ok?.done).toBeDefined(); + expect(finalStatusResult.status).toBe("done"); }); it("should handle transaction abort", async () => { From 79547fff4493d2f7acec8f6d7434f0295b947e46 Mon Sep 17 00:00:00 2001 From: Irakli Gozalishvili Date: Fri, 18 Jul 2025 09:13:23 -0700 Subject: [PATCH 3/5] Refactor storage transaction journal and activity tracking --- packages/runner/src/scheduler.ts | 18 +- packages/runner/src/storage/interface.ts | 32 +--- .../runner/src/storage/transaction-shim.ts | 159 ++++++++++-------- 3 files changed, 108 insertions(+), 101 deletions(-) diff --git a/packages/runner/src/scheduler.ts b/packages/runner/src/scheduler.ts index ba3fd0f92..559fde764 100644 --- a/packages/runner/src/scheduler.ts +++ b/packages/runner/src/scheduler.ts @@ -446,17 +446,21 @@ export function txToReactivityLog( tx: IExtendedStorageTransaction, ): ReactivityLog { const log: ReactivityLog = { reads: [], writes: [] }; - for (const change of tx.log()) { - if ("read" in change) { + for (const activity of tx.log()) { + if ("read" in activity && activity.read) { log.reads.push({ - ...change.read!.address, - path: change.read!.address.path.slice(1), // Remove the "value" prefix + space: activity.read.space, + id: activity.read.id, + type: activity.read.type, + path: activity.read.path.slice(1), // Remove the "value" prefix }); } - if ("write" in change) { + if ("write" in activity && activity.write) { log.writes.push({ - ...change.write!.address, - path: change.write!.address.path.slice(1), + space: activity.write.space, + id: activity.write.id, + type: activity.write.type, + path: activity.write.path.slice(1), }); } } diff --git a/packages/runner/src/storage/interface.ts b/packages/runner/src/storage/interface.ts index cf1d289bc..2b5ca7bb2 100644 --- a/packages/runner/src/storage/interface.ts +++ b/packages/runner/src/storage/interface.ts @@ -56,17 +56,6 @@ export type Read = IAttestation; */ export type Write = IAttestation; -export interface IStorageTransactionInvariant { - read?: IStorageInvariant; - write?: IStorageInvariant; -} - -export interface IStorageTransactionLog { - get(address: IMemorySpaceAddress): IStorageTransactionInvariant; - addRead(read: IStorageInvariant): void; - addWrite(write: IStorageInvariant): void; - [Symbol.iterator](): Iterator; -} // This type is used to tag a document with any important metadata. // Currently, the only supported type is the classification. @@ -371,11 +360,6 @@ export interface IMemoryChange { after: JSONValue | undefined; } -export type IStorageTransactionProgress = Variant<{ - open: IStorageTransactionLog; - pending: IStorageTransactionLog; - done: IStorageTransactionLog; -}>; export type StorageTransactionStatus = | { status: "ready"; journal: ITransactionJournal } | { status: "pending"; journal: ITransactionJournal } @@ -539,18 +523,18 @@ export interface IExtendedStorageTransaction extends IStorageTransaction { ): void; /** - * Returns the log of the transaction. + * Returns the transaction journal. * - * The log is a list of changes that have been made to the transaction. + * The journal contains a list of activities (reads and writes) that have been made to the transaction. * It is used to track the dependencies of the transaction. * - * If the transaction is aborted, the log reflects the attempted reads and - * writes. If the transaction is committed, the log reflects the actual reads + * If the transaction is aborted, the activity reflects the attempted reads and + * writes. If the transaction is committed, the activity reflects the actual reads * and writes. * - * @deprecated + * @deprecated - Use ITransactionJournal.activity() instead */ - log(): IStorageTransactionLog; + log(): Iterable; } export interface ITransactionReader { @@ -912,7 +896,3 @@ export interface IAttestation { readonly value?: JSONValue; } -export interface IStorageInvariant { - readonly address: IMemorySpaceAddress; - readonly value?: JSONValue; -} diff --git a/packages/runner/src/storage/transaction-shim.ts b/packages/runner/src/storage/transaction-shim.ts index 39f0fce01..9e0620997 100644 --- a/packages/runner/src/storage/transaction-shim.ts +++ b/packages/runner/src/storage/transaction-shim.ts @@ -1,26 +1,27 @@ import { isObject, isRecord } from "@commontools/utils/types"; import type { + Activity, CommitError, + IAttestation, IExtendedStorageTransaction, IInvalidDataURIError, IMemorySpaceAddress, InactiveTransactionError, INotFoundError, + IReadActivity, IReadOptions, - IStorageInvariant, IStorageTransaction, IStorageTransactionComplete, IStorageTransactionInconsistent, - IStorageTransactionInvariant, - IStorageTransactionLog, - IStorageTransactionProgress, ITransactionJournal, ITransactionReader, ITransactionWriter, IUnsupportedMediaTypeError, + JournalArchive, JSONValue, MemoryAddressPathComponent, MemorySpace, + Metadata, Read, ReaderError, ReadError, @@ -105,25 +106,70 @@ function validateParentPath( } /** - * Simple implementation of IStorageTransactionLog that tracks read/write operations + * Simple implementation of ITransactionJournal for tracking read/write operations */ -class StorageTransactionLog implements IStorageTransactionLog { - private log: IStorageTransactionInvariant[] = []; +class TransactionJournal implements ITransactionJournal { + private activities: Activity[] = []; - get(_address: IMemorySpaceAddress): IStorageTransactionInvariant { - throw new Error("Not implemented"); + addRead(address: IMemorySpaceAddress, options?: IReadOptions): void { + const readActivity: IReadActivity = { + ...address, + meta: options?.meta ?? {}, + }; + this.activities.push({ read: readActivity }); + } + + addWrite(address: IMemorySpaceAddress): void { + this.activities.push({ write: address }); } - addRead(read: IStorageInvariant): void { - this.log.push({ read }); + // ITransactionJournal implementation + activity(): Iterable { + return this; } - addWrite(write: IStorageInvariant): void { - this.log.push({ write }); + novelty(): Iterable { + return []; } - [Symbol.iterator](): Iterator { - return this.log[Symbol.iterator](); + history(): Iterable { + return []; + } + + reader(): Result { + const error: InactiveTransactionError = { + name: "StorageTransactionCompleteError", + message: "Not implemented", + }; + return { error }; + } + + writer(): Result { + const error: InactiveTransactionError = { + name: "StorageTransactionCompleteError", + message: "Not implemented", + }; + return { error }; + } + + close(): Result { + const error: InactiveTransactionError = { + name: "StorageTransactionCompleteError", + message: "Not implemented", + }; + return { error }; + } + + abort(): Result { + const error: InactiveTransactionError = { + name: "StorageTransactionCompleteError", + message: "Not implemented", + }; + return { error }; + } + + [Symbol.iterator](): Iterator { + return this.activities[Symbol.iterator](); } } @@ -134,7 +180,7 @@ class TransactionReader implements ITransactionReader { constructor( protected runtime: IRuntime, protected space: MemorySpace, - protected log: StorageTransactionLog, + protected journal: TransactionJournal, ) {} did() { @@ -169,11 +215,11 @@ class TransactionReader implements ITransactionReader { const value = getValueAtPath(json, address.path); - const read: IStorageInvariant = { + const read: Read = { address, value, }; - this.log.addRead(read); + this.journal.addRead(address, options); return { ok: read }; } catch (error) { @@ -224,11 +270,11 @@ class TransactionReader implements ITransactionReader { } // Read from doc itself const value = doc.getAtPath(rest); - const read: IStorageInvariant = { + const read: Read = { address, value, }; - this.log.addRead(read); + this.journal.addRead(address, options); return { ok: read }; } else if (first === "source") { // Only allow path length 1 @@ -246,11 +292,11 @@ class TransactionReader implements ITransactionReader { // Convert EntityId to URI string value = `of:${JSON.parse(JSON.stringify(sourceCell.entityId))["/"]}`; } - const read: IStorageInvariant = { + const read: Read = { address, value, }; - this.log.addRead(read); + this.journal.addRead(address, options); return { ok: read }; } else { const notFoundError: INotFoundError = new Error( @@ -337,11 +383,11 @@ class TransactionWriter extends TransactionReader } // Write to doc itself doc.setAtPath(rest, value); - const write: IStorageInvariant = { + const write: Write = { address, value, }; - this.log.addWrite(write); + this.journal.addWrite(address); return { ok: write }; } else if (first === "source") { // Only allow path length 1 @@ -375,11 +421,11 @@ class TransactionWriter extends TransactionReader return { ok: undefined, error: notFoundError }; } doc.sourceCell = sourceDoc; - const write: IStorageInvariant = { + const write: Write = { address, value, }; - this.log.addWrite(write); + this.journal.addWrite(address); return { ok: write }; } else { const notFoundError: INotFoundError = new Error( @@ -395,44 +441,21 @@ class TransactionWriter extends TransactionReader * Implementation of IStorageTransaction that uses DocImpl and runtime.documentMap */ export class StorageTransaction implements IStorageTransaction { - private txLog = new StorageTransactionLog(); - private currentStatus: IStorageTransactionProgress = { open: this.txLog }; + private journal = new TransactionJournal(); + private currentStatus: StorageTransactionStatus; private readers = new Map(); private writers = new Map(); - constructor(private runtime: IRuntime) {} + constructor(private runtime: IRuntime) { + this.currentStatus = { status: "ready", journal: this.journal }; + } status(): StorageTransactionStatus { - // Create a minimal ITransactionJournal adapter for this.txLog - const notImplementedError: InactiveTransactionError = { - name: "StorageTransactionCompleteError", - message: "Not implemented", - }; - - const journal: ITransactionJournal = { - activity: () => [], - novelty: () => [], - history: () => [], - reader: () => ({ error: notImplementedError }), - writer: () => ({ error: notImplementedError }), - close: () => ({ error: notImplementedError }), - abort: () => ({ error: notImplementedError }), - }; - - if (this.currentStatus.open) { - return { status: "ready", journal }; - } else if (this.currentStatus.pending) { - return { status: "pending", journal }; - } else if (this.currentStatus.done) { - return { status: "done", journal }; - } else { - // This should not happen in normal flow, but return ready as fallback - return { status: "ready", journal }; - } + return this.currentStatus; } reader(space: MemorySpace): Result { - if (this.currentStatus.open === undefined) { + if (this.currentStatus.status !== "ready") { const error = new Error( "Storage transaction complete", ) as IStorageTransactionComplete; @@ -445,7 +468,7 @@ export class StorageTransaction implements IStorageTransaction { let reader = this.readers.get(space); if (!reader) { - reader = new TransactionReader(this.runtime, space, this.txLog); + reader = new TransactionReader(this.runtime, space, this.journal); this.readers.set(space, reader); } @@ -466,7 +489,7 @@ export class StorageTransaction implements IStorageTransaction { } writer(space: MemorySpace): Result { - if (this.currentStatus.open === undefined) { + if (this.currentStatus.status !== "ready") { const error = new Error( "Storage transaction complete", ) as IStorageTransactionComplete; @@ -491,7 +514,7 @@ export class StorageTransaction implements IStorageTransaction { let writer = this.writers.get(space); if (!writer) { - writer = new TransactionWriter(this.runtime, space, this.txLog); + writer = new TransactionWriter(this.runtime, space, this.journal); this.writers.set(space, writer); } @@ -515,7 +538,7 @@ export class StorageTransaction implements IStorageTransaction { } abort(reason?: any): Result { - if (this.currentStatus.open === undefined) { + if (this.currentStatus.status !== "ready") { const error = new Error( "Storage transaction complete", ) as IStorageTransactionComplete; @@ -526,13 +549,13 @@ export class StorageTransaction implements IStorageTransaction { }; } - // Set status to done with the current log to indicate the transaction is complete - this.currentStatus = { done: this.txLog }; + // Set status to done with the current journal to indicate the transaction is complete + this.currentStatus = { status: "done", journal: this.currentStatus.journal }; return { ok: undefined }; } commit(): Promise> { - if (this.currentStatus.open === undefined) { + if (this.currentStatus.status !== "ready") { const error: any = new Error("Transaction already aborted"); error.name = "StorageTransactionAborted"; error.reason = "Transaction was aborted"; @@ -541,7 +564,7 @@ export class StorageTransaction implements IStorageTransaction { // For now, just mark as done since we're only implementing basic read/write // In a real implementation, this would send the transaction to upstream storage - this.currentStatus = { done: this.txLog }; + this.currentStatus = { status: "done", journal: this.currentStatus.journal }; return Promise.resolve({ ok: {} }); } } @@ -553,11 +576,11 @@ export class ExtendedStorageTransaction implements IExtendedStorageTransaction { return this.tx.status(); } - log(): IStorageTransactionLog { - if (typeof (this.tx as any).txLog !== "undefined") { - return (this.tx as any).txLog; + log(): TransactionJournal { + if (typeof (this.tx as any).journal !== "undefined") { + return (this.tx as any).journal; } - throw new Error("Underlying transaction does not expose log"); + throw new Error("Underlying transaction does not expose transaction journal"); } reader(space: MemorySpace): Result { From f3efe3a3a908728a535c1d83b71f494cbde1afde Mon Sep 17 00:00:00 2001 From: Irakli Gozalishvili Date: Fri, 18 Jul 2025 09:32:46 -0700 Subject: [PATCH 4/5] Refactor TransactionJournal to use private field and improve methods --- packages/runner/src/storage/interface.ts | 28 ----- .../runner/src/storage/transaction-shim.ts | 112 +++++++++--------- 2 files changed, 55 insertions(+), 85 deletions(-) diff --git a/packages/runner/src/storage/interface.ts b/packages/runner/src/storage/interface.ts index 2b5ca7bb2..a07a9ea8e 100644 --- a/packages/runner/src/storage/interface.ts +++ b/packages/runner/src/storage/interface.ts @@ -56,7 +56,6 @@ export type Read = IAttestation; */ export type Write = IAttestation; - // This type is used to tag a document with any important metadata. // Currently, the only supported type is the classification. export type Labels = { @@ -808,32 +807,6 @@ export interface ITransactionJournal { novelty(space: MemorySpace): Iterable; history(space: MemorySpace): Iterable; - - reader( - space: MemorySpace, - ): Result; - - writer( - space: MemorySpace, - ): Result; - - /** - * Closes underlying transaction, making it non-editable going forward. Any - * attempts to edit it will fail. - */ - close(): Result, InactiveTransactionError>; - - /** - * Aborts underlying transaction, making it non-editable going forward. Any - * attempts to edit it will fail. - */ - abort(reason?: unknown): Result; -} - -export interface EditableJournal { - activity(): Iterable; - novelty: Iterable; - history(): Iterable; } export interface ITransaction { @@ -895,4 +868,3 @@ export interface IAttestation { readonly address: IMemoryAddress; readonly value?: JSONValue; } - diff --git a/packages/runner/src/storage/transaction-shim.ts b/packages/runner/src/storage/transaction-shim.ts index 9e0620997..e8a19834b 100644 --- a/packages/runner/src/storage/transaction-shim.ts +++ b/packages/runner/src/storage/transaction-shim.ts @@ -17,7 +17,6 @@ import type { ITransactionReader, ITransactionWriter, IUnsupportedMediaTypeError, - JournalArchive, JSONValue, MemoryAddressPathComponent, MemorySpace, @@ -109,67 +108,57 @@ function validateParentPath( * Simple implementation of ITransactionJournal for tracking read/write operations */ class TransactionJournal implements ITransactionJournal { - private activities: Activity[] = []; + #activity: Activity[] = []; addRead(address: IMemorySpaceAddress, options?: IReadOptions): void { const readActivity: IReadActivity = { ...address, meta: options?.meta ?? {}, }; - this.activities.push({ read: readActivity }); + this.#activity.push({ read: readActivity }); } addWrite(address: IMemorySpaceAddress): void { - this.activities.push({ write: address }); + this.#activity.push({ write: address }); } // ITransactionJournal implementation activity(): Iterable { - return this; - } - - novelty(): Iterable { - return []; - } - - history(): Iterable { - return []; - } - - reader(): Result { - const error: InactiveTransactionError = { - name: "StorageTransactionCompleteError", - message: "Not implemented", - }; - return { error }; - } - - writer(): Result { - const error: InactiveTransactionError = { - name: "StorageTransactionCompleteError", - message: "Not implemented", - }; - return { error }; - } - - close(): Result { - const error: InactiveTransactionError = { - name: "StorageTransactionCompleteError", - message: "Not implemented", - }; - return { error }; - } - - abort(): Result { - const error: InactiveTransactionError = { - name: "StorageTransactionCompleteError", - message: "Not implemented", - }; - return { error }; + return this.#activity; + } + + *novelty(space: MemorySpace): Iterable { + for (const activity of this.#activity) { + if (activity.write) { + if (activity.write.space === space) { + yield { + address: { + id: activity.write.id, + type: activity.write.type, + path: activity.write.path, + }, + value: undefined, // Value not available in activity log + }; + } + } + } } - [Symbol.iterator](): Iterator { - return this.activities[Symbol.iterator](); + *history(space: MemorySpace): Iterable { + for (const activity of this.#activity) { + if (activity.read) { + if (activity.read.space === space) { + yield { + address: { + id: activity.read.id, + type: activity.read.type, + path: activity.read.path, + }, + value: undefined, // Value not available in activity log + }; + } + } + } } } @@ -441,7 +430,7 @@ class TransactionWriter extends TransactionReader * Implementation of IStorageTransaction that uses DocImpl and runtime.documentMap */ export class StorageTransaction implements IStorageTransaction { - private journal = new TransactionJournal(); + journal = new TransactionJournal(); private currentStatus: StorageTransactionStatus; private readers = new Map(); private writers = new Map(); @@ -475,7 +464,10 @@ export class StorageTransaction implements IStorageTransaction { return { ok: reader }; } - read(address: IMemorySpaceAddress, options?: IReadOptions): Result { + read( + address: IMemorySpaceAddress, + options?: IReadOptions, + ): Result { const readerResult = this.reader(address.space); if (readerResult.error) { return { ok: undefined, error: readerResult.error }; @@ -550,7 +542,10 @@ export class StorageTransaction implements IStorageTransaction { } // Set status to done with the current journal to indicate the transaction is complete - this.currentStatus = { status: "done", journal: this.currentStatus.journal }; + this.currentStatus = { + status: "done", + journal: this.currentStatus.journal, + }; return { ok: undefined }; } @@ -564,7 +559,10 @@ export class StorageTransaction implements IStorageTransaction { // For now, just mark as done since we're only implementing basic read/write // In a real implementation, this would send the transaction to upstream storage - this.currentStatus = { status: "done", journal: this.currentStatus.journal }; + this.currentStatus = { + status: "done", + journal: this.currentStatus.journal, + }; return Promise.resolve({ ok: {} }); } } @@ -576,18 +574,18 @@ export class ExtendedStorageTransaction implements IExtendedStorageTransaction { return this.tx.status(); } - log(): TransactionJournal { - if (typeof (this.tx as any).journal !== "undefined") { - return (this.tx as any).journal; - } - throw new Error("Underlying transaction does not expose transaction journal"); + log(): Iterable { + return this.tx.status().journal.activity(); } reader(space: MemorySpace): Result { return this.tx.reader(space); } - read(address: IMemorySpaceAddress, options?: IReadOptions): Result { + read( + address: IMemorySpaceAddress, + options?: IReadOptions, + ): Result { return this.tx.read(address, options); } From c2c69287cf5baedd27eb06f1a634b935ef67cac9 Mon Sep 17 00:00:00 2001 From: Irakli Gozalishvili Date: Fri, 18 Jul 2025 09:37:30 -0700 Subject: [PATCH 5/5] Update transaction logging and journal access --- packages/runner/src/scheduler.ts | 2 +- packages/runner/src/storage/interface.ts | 19 ++++++------------- .../runner/src/storage/transaction-shim.ts | 8 ++++---- packages/runner/src/storage/transaction.ts | 4 ++++ 4 files changed, 15 insertions(+), 18 deletions(-) diff --git a/packages/runner/src/scheduler.ts b/packages/runner/src/scheduler.ts index 559fde764..f75d8116d 100644 --- a/packages/runner/src/scheduler.ts +++ b/packages/runner/src/scheduler.ts @@ -446,7 +446,7 @@ export function txToReactivityLog( tx: IExtendedStorageTransaction, ): ReactivityLog { const log: ReactivityLog = { reads: [], writes: [] }; - for (const activity of tx.log()) { + for (const activity of tx.journal.activity()) { if ("read" in activity && activity.read) { log.reads.push({ space: activity.read.space, diff --git a/packages/runner/src/storage/interface.ts b/packages/runner/src/storage/interface.ts index a07a9ea8e..b1e2fd10f 100644 --- a/packages/runner/src/storage/interface.ts +++ b/packages/runner/src/storage/interface.ts @@ -382,6 +382,12 @@ export type StorageTransactionStatus = * invariants have being invalidated, or reject and fail commit. */ export interface IStorageTransaction { + /** + * The transaction journal containing all read and write activities. + * Provides access to transaction operations and dependency tracking. + */ + readonly journal: ITransactionJournal; + /** * Describes current status of the transaction. Returns a union type with * status field indicating the current state: @@ -521,19 +527,6 @@ export interface IExtendedStorageTransaction extends IStorageTransaction { value: JSONValue | undefined, ): void; - /** - * Returns the transaction journal. - * - * The journal contains a list of activities (reads and writes) that have been made to the transaction. - * It is used to track the dependencies of the transaction. - * - * If the transaction is aborted, the activity reflects the attempted reads and - * writes. If the transaction is committed, the activity reflects the actual reads - * and writes. - * - * @deprecated - Use ITransactionJournal.activity() instead - */ - log(): Iterable; } export interface ITransactionReader { diff --git a/packages/runner/src/storage/transaction-shim.ts b/packages/runner/src/storage/transaction-shim.ts index e8a19834b..414ef1cf4 100644 --- a/packages/runner/src/storage/transaction-shim.ts +++ b/packages/runner/src/storage/transaction-shim.ts @@ -570,12 +570,12 @@ export class StorageTransaction implements IStorageTransaction { export class ExtendedStorageTransaction implements IExtendedStorageTransaction { constructor(private tx: IStorageTransaction) {} - status(): StorageTransactionStatus { - return this.tx.status(); + get journal(): ITransactionJournal { + return this.tx.journal; } - log(): Iterable { - return this.tx.status().journal.activity(); + status(): StorageTransactionStatus { + return this.tx.status(); } reader(space: MemorySpace): Result { diff --git a/packages/runner/src/storage/transaction.ts b/packages/runner/src/storage/transaction.ts index 47cb8e09b..5b2cce83e 100644 --- a/packages/runner/src/storage/transaction.ts +++ b/packages/runner/src/storage/transaction.ts @@ -73,6 +73,10 @@ class StorageTransaction implements IStorageTransaction { this.#state = state; } + get journal() { + return this.#state.journal; + } + status(): StorageTransactionStatus { return status(this); }