From 8836e36ec1c11219cb3d067111f199bccde6ad2d Mon Sep 17 00:00:00 2001 From: Bernhard Seefeld Date: Fri, 5 Sep 2025 15:11:19 -0700 Subject: [PATCH 01/11] chore: add retries for write commits in core packages --- .../src/space-manager.ts | 32 +-- .../background-charm-service/src/utils.ts | 38 +-- packages/charm/src/iterate.ts | 19 +- packages/charm/src/manager.ts | 229 +++++++++--------- packages/charm/src/ops/charm-controller.ts | 29 ++- packages/runner/src/builder/json-utils.ts | 4 +- packages/runner/src/cell.ts | 3 + packages/runner/src/runner.ts | 64 ++--- packages/runner/src/runtime.ts | 14 +- packages/shell/src/lib/default-recipe.ts | 27 ++- packages/shell/src/lib/iframe-ctx.ts | 3 + .../google-oauth/google-oauth.utils.ts | 18 +- 12 files changed, 245 insertions(+), 235 deletions(-) diff --git a/packages/background-charm-service/src/space-manager.ts b/packages/background-charm-service/src/space-manager.ts index 336433a6c..0a9958095 100644 --- a/packages/background-charm-service/src/space-manager.ts +++ b/packages/background-charm-service/src/space-manager.ts @@ -198,12 +198,12 @@ export class SpaceManager { this.failureTracking.delete(charmId); } - const tx = entry.runtime.edit(); - entry.withTx(tx).update({ - lastRun: Date.now(), - status: "Success", + entry.runtime.editWithRetry((tx) => { + entry.withTx(tx).update({ + lastRun: Date.now(), + status: "Success", + }); }); - tx.commit(); // TODO(seefeld): We don't retry writing this. Should we? if (this.enabledCharms.has(charmId)) { this.pushTask(charmId, entry); @@ -224,12 +224,12 @@ export class SpaceManager { this.disableCharm(charmId, entry, error); } else { this.failureTracking.set(charmId, failureCount); - const tx = entry.runtime.edit(); - entry.withTx(tx).update({ - lastRun: Date.now(), - status: error, + entry.runtime.editWithRetry((tx) => { + entry.withTx(tx).update({ + lastRun: Date.now(), + status: error, + }); }); - tx.commit(); // TODO(seefeld): We don't retry writing this. Should we? if (this.enabledCharms.has(charmId)) { // Apply a linear backoff for the next attempts @@ -247,13 +247,13 @@ export class SpaceManager { entry: Cell, error: string, ) { - const tx = entry.runtime.edit(); - entry.withTx(tx).update({ - disabledAt: Date.now(), - lastRun: Date.now(), - status: `Disabled: ${error}`, + entry.runtime.editWithRetry((tx) => { + entry.withTx(tx).update({ + disabledAt: Date.now(), + lastRun: Date.now(), + status: `Disabled: ${error}`, + }); }); - tx.commit(); // TODO(seefeld): We don't retry writing this. Should we? this.enabledCharms.delete(charmId); this.pendingTasks = this.pendingTasks.filter((r) => r.charmId !== charmId); diff --git a/packages/background-charm-service/src/utils.ts b/packages/background-charm-service/src/utils.ts index f3f2f231c..d803ee7b1 100644 --- a/packages/background-charm-service/src/utils.ts +++ b/packages/background-charm-service/src/utils.ts @@ -87,19 +87,19 @@ export async function setBGCharm({ if (existingCharmIndex === -1) { console.log("Adding charm to BGUpdater charms cell"); - const tx = runtime.edit(); - charmsCell.withTx(tx).push({ - [ID]: `${space}/${charmId}`, - space, - charmId, - integration, - createdAt: Date.now(), - updatedAt: Date.now(), - disabledAt: undefined, - lastRun: 0, - status: "Initializing", - } as unknown as Cell); - await tx.commit(); + runtime.editWithRetry((tx) => { + charmsCell.withTx(tx).push({ + [ID]: `${space}/${charmId}`, + space, + charmId, + integration, + createdAt: Date.now(), + updatedAt: Date.now(), + disabledAt: undefined, + lastRun: 0, + status: "Initializing", + } as unknown as Cell); + }); // Ensure changes are synced await runtime.storageManager.synced(); @@ -108,13 +108,13 @@ export async function setBGCharm({ } else { console.log("Charm already exists in BGUpdater charms cell, re-enabling"); const existingCharm = charms[existingCharmIndex]; - const tx = runtime.edit(); - existingCharm.withTx(tx).update({ - disabledAt: 0, - updatedAt: Date.now(), - status: "Re-initializing", + runtime.editWithRetry((tx) => { + existingCharm.withTx(tx).update({ + disabledAt: 0, + updatedAt: Date.now(), + status: "Re-initializing", + }); }); - tx.commit(); // TODO(seefeld): We don't retry writing this. Should we? await runtime.storageManager.synced(); return false; diff --git a/packages/charm/src/iterate.ts b/packages/charm/src/iterate.ts index 31a0dd9d0..c9ec709a0 100644 --- a/packages/charm/src/iterate.ts +++ b/packages/charm/src/iterate.ts @@ -202,15 +202,16 @@ export const generateNewRecipeVersion = async ( llmRequestId, ); - const tx = newCharm.runtime.edit(); - newCharm.withTx(tx).getSourceCell(charmSourceCellSchema)?.key("lineage").push( - { - charm: parent, - relation: "iterate", - timestamp: Date.now(), - }, - ); - await tx.commit(); // TODO(seefeld): We don't retry writing this. Should we? + newCharm.runtime.editWithRetry((tx) => { + newCharm.withTx(tx).getSourceCell(charmSourceCellSchema)?.key("lineage") + .push( + { + charm: parent, + relation: "iterate", + timestamp: Date.now(), + }, + ); + }); return newCharm; }; diff --git a/packages/charm/src/manager.ts b/packages/charm/src/manager.ts index e66f96fa0..a9485cc96 100644 --- a/packages/charm/src/manager.ts +++ b/packages/charm/src/manager.ts @@ -183,19 +183,19 @@ export class CharmManager { } async unpinById(charmId: EntityId) { - await this.syncCharms(this.pinnedCharms); - const newPinnedCharms = filterOutEntity(this.pinnedCharms, charmId); - - if (newPinnedCharms.length !== this.pinnedCharms.get().length) { - const tx = this.runtime.edit(); - const pinnedCharms = this.pinnedCharms.withTx(tx); - pinnedCharms.set(newPinnedCharms); - await tx.commit(); - await this.runtime.idle(); - return true; - } + let changed = false; - return false; + await this.syncCharms(this.pinnedCharms); + return await this.runtime.editWithRetry((tx) => { + const newPinnedCharms = filterOutEntity( + this.pinnedCharms.withTx(tx), + charmId, + ); + if (newPinnedCharms.length !== this.pinnedCharms.get().length) { + this.pinnedCharms.withTx(tx).set(newPinnedCharms); + changed = true; + } + }) && changed; } async unpin(charm: Cell | string | EntityId) { @@ -219,28 +219,28 @@ export class CharmManager { await this.syncCharms(this.trashedCharms); await this.syncCharms(this.charms); - const tx = this.runtime.edit(); + let trashedCharm: Cell | undefined; - const trashedCharms = this.trashedCharms.withTx(tx); + this.runtime.editWithRetry((tx) => { + const trashedCharms = this.trashedCharms.withTx(tx); - const id = getEntityId(idOrCharm); - if (!id) return false; + const id = getEntityId(idOrCharm); + if (!id) return false; - // Find the charm in trash - const trashedCharm = trashedCharms.get().find((charm) => - isSameEntity(charm, id) - ); - - if (!trashedCharm) return false; + // Find the charm in trash + trashedCharm = trashedCharms.get().find((charm) => + isSameEntity(charm, id) + ); - // Remove from trash - const newTrashedCharms = filterOutEntity(trashedCharms, id); - trashedCharms.set(newTrashedCharms); + if (!trashedCharm) return false; - // Add back to charms - await this.add([trashedCharm], tx); + // Remove from trash + const newTrashedCharms = filterOutEntity(trashedCharms, id); + trashedCharms.set(newTrashedCharms); - await tx.commit(); // TODO(seefeld): Retry? + // Add back to charms + this.addCharms([trashedCharm], tx); + }); await this.runtime.idle(); return true; @@ -248,12 +248,10 @@ export class CharmManager { async emptyTrash() { await this.syncCharms(this.trashedCharms); - const tx = this.runtime.edit(); - const trashedCharms = this.trashedCharms.withTx(tx); - trashedCharms.set([]); - await tx.commit(); - await this.runtime.idle(); - return true; + return await this.runtime.editWithRetry((tx) => { + const trashedCharms = this.trashedCharms.withTx(tx); + trashedCharms.set([]); + }); } // FIXME(ja): this says it returns a list of charm, but it isn't! you will @@ -266,20 +264,27 @@ export class CharmManager { return this.charms; } - async add(newCharms: Cell[], tx?: IExtendedStorageTransaction) { - await this.syncCharms(this.charms); - await this.runtime.idle(); - - const transaction = tx ?? this.runtime.edit(); - const charms = this.charms.withTx(transaction); + private addCharms(newCharms: Cell[], tx: IExtendedStorageTransaction) { + const charms = this.charms.withTx(tx); newCharms.forEach((charm) => { if (!charms.get().some((otherCharm) => otherCharm.equals(charm))) { charms.push(charm); } }); + } - if (!tx) await transaction.commit(); // TODO(seefeld): Retry? + async add(newCharms: Cell[], tx?: IExtendedStorageTransaction) { + await this.syncCharms(this.charms); + await this.runtime.idle(); + + if (tx) { + this.addCharms(newCharms, tx); + } else { + await this.runtime.editWithRetry((tx) => { + this.addCharms(newCharms, tx); + }); + } } syncCharms(cell: Cell[]>) { @@ -789,6 +794,8 @@ export class CharmManager { // note: removing a charm doesn't clean up the charm's cells // Now moves the charm to trash instead of just removing it async remove(idOrCharm: string | EntityId | Cell) { + let success = false; + await Promise.all([ this.syncCharms(this.charms), this.syncCharms(this.pinnedCharms), @@ -800,50 +807,53 @@ export class CharmManager { await this.unpin(idOrCharm); - // Find the charm in the main list - const charm = this.charms.get().find((c) => isSameEntity(c, id)); - if (!charm) return false; + return await this.runtime.editWithRetry((tx) => { + // Find the charm in the main list + const charm = this.charms.withTx(tx).get().find((c) => + isSameEntity(c, id) + ); + if (!charm) { + success = false; + return; + } - // Move to trash if not already there - if (!this.trashedCharms.get().some((c) => isSameEntity(c, id))) { - const tx = this.runtime.edit(); - const trashedCharms = this.trashedCharms.withTx(tx); - trashedCharms.push(charm); - await tx.commit(); - } + // Move to trash if not already there + if ( + !this.trashedCharms.withTx(tx).get().some((c) => isSameEntity(c, id)) + ) { + this.trashedCharms.withTx(tx).push(charm); + } - // Remove from main list - const newCharms = filterOutEntity(this.charms, id); - if (newCharms.length !== this.charms.get().length) { - const tx = this.runtime.edit(); - const charms = this.charms.withTx(tx); - charms.set(newCharms); - await tx.commit(); - await this.runtime.idle(); - return true; - } + // Remove from main list + const newCharms = filterOutEntity(this.charms.withTx(tx), id); + if (newCharms.length !== this.charms.get().length) { + this.charms.withTx(tx).set(newCharms); + } - return false; + success = true; + }) && success; } // Permanently delete a charm (from trash or directly) async permanentlyDelete(idOrCharm: string | EntityId | Cell) { + let success = false; + await this.syncCharms(this.trashedCharms); const id = getEntityId(idOrCharm); if (!id) return false; - // Remove from trash if present - const newTrashedCharms = filterOutEntity(this.trashedCharms, id); - if (newTrashedCharms.length !== this.trashedCharms.get().length) { - const tx = this.runtime.edit(); - this.trashedCharms.withTx(tx).set(newTrashedCharms); - tx.commit(); - await this.runtime.idle(); - return true; - } - - return false; + return await this.runtime.editWithRetry((tx) => { + // Remove from trash if present + const newTrashedCharms = filterOutEntity( + this.trashedCharms.withTx(tx), + id, + ); + if (newTrashedCharms.length !== this.trashedCharms.get().length) { + this.trashedCharms.withTx(tx).set(newTrashedCharms); + success = true; + } + }) && success; } async runPersistent( @@ -915,11 +925,11 @@ export class CharmManager { await this.add([charm]); if (llmRequestId) { - const tx = this.runtime.edit(); - charm.getSourceCell(charmSourceCellSchema)?.key("llmRequestId") - .withTx(tx) - .set(llmRequestId); - await tx.commit(); + this.runtime.editWithRetry((tx) => { + charm.getSourceCell(charmSourceCellSchema)?.key("llmRequestId") + .withTx(tx) + .set(llmRequestId); + }); } return charm; @@ -1004,44 +1014,43 @@ export class CharmManager { "Target", ); - const tx = this.runtime.edit(); + await this.runtime.editWithRetry((tx) => { + // Navigate to the source path + // Cannot navigate `Cell` + // FIXME: types + // deno-lint-ignore no-explicit-any + let sourceResultCell = sourceCell.withTx(tx) as Cell; + // For charms, manager.get() already returns the result cell, so no need to add "result" - // Navigate to the source path - // Cannot navigate `Cell` - // FIXME: types - // deno-lint-ignore no-explicit-any - let sourceResultCell = sourceCell.withTx(tx) as Cell; - // For charms, manager.get() already returns the result cell, so no need to add "result" - - for (const segment of sourcePath) { - sourceResultCell = sourceResultCell.key(segment); - } + for (const segment of sourcePath) { + sourceResultCell = sourceResultCell.key(segment); + } - // Navigate to the target path - const targetKey = targetPath.pop(); - if (targetKey === undefined) { - throw new Error("Target path cannot be empty"); - } + // Navigate to the target path + const targetKey = targetPath.pop(); + if (targetKey === undefined) { + throw new Error("Target path cannot be empty"); + } - // Cannot navigate `Cell` - // FIXME: types - // deno-lint-ignore no-explicit-any - let targetInputCell = targetCell.withTx(tx) as Cell; - if (targetIsCharm) { - // For charms, target fields are in the source cell's argument - const sourceCell = targetCell.getSourceCell(processSchema); - if (!sourceCell) { - throw new Error("Target charm has no source cell"); + // Cannot navigate `Cell` + // FIXME: types + // deno-lint-ignore no-explicit-any + let targetInputCell = targetCell.withTx(tx) as Cell; + if (targetIsCharm) { + // For charms, target fields are in the source cell's argument + const sourceCell = targetCell.getSourceCell(processSchema); + if (!sourceCell) { + throw new Error("Target charm has no source cell"); + } + targetInputCell = sourceCell.key("argument").withTx(tx); } - targetInputCell = sourceCell.key("argument").withTx(tx); - } - for (const segment of targetPath) { - targetInputCell = targetInputCell.key(segment); - } + for (const segment of targetPath) { + targetInputCell = targetInputCell.key(segment); + } - targetInputCell.key(targetKey).set(sourceResultCell); - await tx.commit(); + targetInputCell.key(targetKey).set(sourceResultCell); + }); await this.runtime.idle(); await this.synced(); } diff --git a/packages/charm/src/ops/charm-controller.ts b/packages/charm/src/ops/charm-controller.ts index 6617f9b0a..2c61d0aac 100644 --- a/packages/charm/src/ops/charm-controller.ts +++ b/packages/charm/src/ops/charm-controller.ts @@ -37,23 +37,22 @@ class CharmPropIo implements CharmCellIo { async set(value: unknown, path?: CellPath) { const manager = this.#cc.manager(); - const tx = manager.runtime.edit(); - const targetCell = this.#getTargetCell(); - // Build the path with transaction context - let txCell = targetCell.withTx(tx); - for (const segment of (path ?? [])) { - txCell = txCell.key(segment); - } + await manager.runtime.editWithRetry((tx) => { + const targetCell = this.#getTargetCell(); + + // Build the path with transaction context + let txCell = targetCell.withTx(tx); + for (const segment of (path ?? [])) { + txCell = txCell.key(segment); + } + + // Set the value + // FIXME: types + // Charm input is not a Charm + txCell.set(value as Charm); + }); - // Set the value - // FIXME: types - // Charm input is not a Charm - txCell.set(value as Charm); - const result = await tx.commit(); - if (result.error) { - throw result.error; - } await manager.runtime.idle(); await manager.synced(); } diff --git a/packages/runner/src/builder/json-utils.ts b/packages/runner/src/builder/json-utils.ts index 2fadec53d..365bad2e8 100644 --- a/packages/runner/src/builder/json-utils.ts +++ b/packages/runner/src/builder/json-utils.ts @@ -152,9 +152,7 @@ export function createJsonSchema( return JSON.parse(JSON.stringify(seen.get(linkAsStr)!)); } - const tx = runtime?.edit(); - const cell = tx && runtime?.getCellFromLink(link, undefined, tx); - tx?.commit(); + const cell = runtime?.getCellFromLink(link); if (!cell) return {}; // TODO(seefeld): Should be `true` let schema = cell.schema; diff --git a/packages/runner/src/cell.ts b/packages/runner/src/cell.ts index c4115a420..a4e56a984 100644 --- a/packages/runner/src/cell.ts +++ b/packages/runner/src/cell.ts @@ -814,6 +814,9 @@ function subscribeToReferencedDocs( // Call the callback once with initial value. let cleanup: Cancel | undefined = callback(value); + // Technically unnecessary since we don't expect/allow callbacks to sink to + // write to other cells, and we rety by design anyway below when read data + // changed. But ideally we enforce read-only as well. tx.commit(); const cancel = runtime.scheduler.subscribe((tx) => { diff --git a/packages/runner/src/runner.ts b/packages/runner/src/runner.ts index e820909f0..246844bab 100644 --- a/packages/runner/src/runner.ts +++ b/packages/runner/src/runner.ts @@ -74,13 +74,13 @@ export class Runner implements IRunner { recipeFactory: NodeFactory, argument: T, resultCell: Cell, - ): Cell; + ): Promise>; setup( tx: IExtendedStorageTransaction | undefined, recipe: Recipe | Module | undefined, argument: T, resultCell: Cell, - ): Cell; + ): Promise>; /** * Configure charm without running it. If the charm is already running and the @@ -91,11 +91,16 @@ export class Runner implements IRunner { recipeOrModule: Recipe | Module | undefined, argument: T, resultCell: Cell, - ): Cell { - const tx = providedTx ?? this.runtime.edit(); - this.setupInternal(tx, recipeOrModule, argument, resultCell); - if (!providedTx) tx.commit(); - return resultCell; + ): Promise> { + if (providedTx) { + this.setupInternal(providedTx, recipeOrModule, argument, resultCell); + return Promise.resolve(resultCell); + } else { + return this.runtime.editWithRetry((tx) => { + this.setupInternal(tx, recipeOrModule, argument, resultCell); + return resultCell; + }).then(() => resultCell); + } } /** @@ -462,38 +467,35 @@ export class Runner implements IRunner { // run. Though most likely the worst case is just extra invocations. const givenTx = resultCell.tx?.status().status === "ready" && resultCell.tx; const tx = givenTx || this.runtime.edit(); - const setupRes = this.setupInternal( - tx, - recipe, - inputs, - resultCell.withTx(tx), - ); + let setupRes: ReturnType | undefined; + if (givenTx) { + // If tx is given, i.e. result cell was part of a tx that is still open, + // caller manages retries + setupRes = this.setupInternal( + givenTx, + recipe, + inputs, + resultCell.withTx(givenTx), + ); + } else { + await this.runtime.editWithRetry((tx) => { + setupRes = this.setupInternal( + tx, + recipe, + inputs, + resultCell.withTx(tx), + ); + }); + } // If a new recipe was specified, make sure to sync any new cells - // TODO(seefeld): Possible race condition here with lifted functions running - // and using old data to update a value that arrives just between starting and - // finishing the computation. Should be fixed by changing conflict resolution - // for derived values to be based on what they are derived from. if (recipe || !synced) { await this.syncCellsForRunningRecipe(resultCell.withTx(tx), recipe); } - if (setupRes.needsStart) { + if (setupRes?.needsStart) { this.startWithTx(tx, resultCell.withTx(tx), setupRes.recipe); } - const txPromise = givenTx ? undefined : tx.commit(); - - if (txPromise) { - const { error } = await txPromise; - if (error) { - console.error( - "Error committing transaction:", - error, - JSON.stringify((error as any).transaction?.args, null, 2), - ); - throw error; - } - } return recipe?.resultSchema ? resultCell.asSchema(recipe.resultSchema) diff --git a/packages/runner/src/runtime.ts b/packages/runner/src/runtime.ts index 91e1ef93a..fce89e65b 100644 --- a/packages/runner/src/runtime.ts +++ b/packages/runner/src/runtime.ts @@ -171,13 +171,13 @@ export interface IRuntime { recipeFactory: NodeFactory, argument: T, resultCell: Cell, - ): Cell; + ): Promise>; setup( tx: IExtendedStorageTransaction | undefined, recipe: Recipe | Module | undefined, argument: T, resultCell: Cell, - ): Cell; + ): Promise>; run( tx: IExtendedStorageTransaction, recipeFactory: NodeFactory, @@ -258,13 +258,13 @@ export interface IRunner { recipeFactory: NodeFactory, argument: T, resultCell: Cell, - ): Cell; + ): Promise>; setup( tx: IExtendedStorageTransaction | undefined, recipe: Recipe | Module | undefined, argument: T, resultCell: Cell, - ): Cell; + ): Promise>; run( tx: IExtendedStorageTransaction | undefined, @@ -581,19 +581,19 @@ export class Runtime implements IRuntime { recipeFactory: NodeFactory, argument: T, resultCell: Cell, - ): Cell; + ): Promise>; setup( tx: IExtendedStorageTransaction | undefined, recipe: Recipe | Module | undefined, argument: T, resultCell: Cell, - ): Cell; + ): Promise>; setup( tx: IExtendedStorageTransaction | undefined, recipeOrModule: Recipe | Module | undefined, argument: T, resultCell: Cell, - ): Cell { + ): Promise> { return this.runner.setup(tx, recipeOrModule, argument, resultCell); } run( diff --git a/packages/shell/src/lib/default-recipe.ts b/packages/shell/src/lib/default-recipe.ts index 5f86def62..c10c7d7fc 100644 --- a/packages/shell/src/lib/default-recipe.ts +++ b/packages/shell/src/lib/default-recipe.ts @@ -14,21 +14,22 @@ export async function create(cc: CharmsController): Promise { ); const charm = await cc.create(recipeContent); - const tx = runtime.edit(); - const charmCell = charm.getCell(); - const sourceCell = charmCell.getSourceCell(processSchema); + const allCharmsCell = await manager.getCellById({ "/": ALL_CHARMS_ID }); - if (!sourceCell) { - // Not sure how/when this happens - throw new Error("Could not create and link default recipe."); - } + await runtime.editWithRetry((tx) => { + const charmCell = charm.getCell(); + const sourceCell = charmCell.getSourceCell(processSchema); - // Get the well-known allCharms cell using its EntityId format - const allCharmsCell = await manager.getCellById({ "/": ALL_CHARMS_ID }); - sourceCell.withTx(tx).key("argument").key("allCharms").set( - allCharmsCell.withTx(tx), - ); - await tx.commit(); + if (!sourceCell) { + // Not sure how/when this happens + throw new Error("Could not create and link default recipe."); + } + + // Get the well-known allCharms cell using its EntityId format + sourceCell.withTx(tx).key("argument").key("allCharms").set( + allCharmsCell.withTx(tx), + ); + }); // Wait for the link to be processed await runtime.idle(); diff --git a/packages/shell/src/lib/iframe-ctx.ts b/packages/shell/src/lib/iframe-ctx.ts index 4b11f194f..4960ef66a 100644 --- a/packages/shell/src/lib/iframe-ctx.ts +++ b/packages/shell/src/lib/iframe-ctx.ts @@ -76,6 +76,9 @@ export const setupIframe = (runtime: Runtime) => ) { const tx = context.runtime.edit(); context.withTx(tx).key(key).set(value); + // No retry, since if we there is a conflict, the iframe will by the + // time this promise resolves have already gotten the base-line truth + // (In other words: It's correct to ignore this edit) tx.commit(); } else { console.warn( diff --git a/packages/toolshed/routes/integrations/google-oauth/google-oauth.utils.ts b/packages/toolshed/routes/integrations/google-oauth/google-oauth.utils.ts index 0858b2d3f..7744f9f09 100644 --- a/packages/toolshed/routes/integrations/google-oauth/google-oauth.utils.ts +++ b/packages/toolshed/routes/integrations/google-oauth/google-oauth.utils.ts @@ -186,12 +186,9 @@ export async function persistTokens( }; // Set the new tokens to the auth cell - const tx = authCell.runtime.edit(); - authCell.withTx(tx).set(tokenData); - tx.commit(); // TODO(seefeld): We don't retry writing this. Should we? - - // Ensure the cell is synced - await runtime.storageManager.synced(); + await authCell.runtime.editWithRetry((tx) => { + authCell.withTx(tx).set(tokenData); + }); return tokenData; } catch (error) { @@ -302,12 +299,9 @@ export async function clearAuthData(authCellDocLink: string) { }; // Set the empty data to the auth cell - const tx = authCell.runtime.edit(); - authCell.withTx(tx).set(emptyAuthData); - tx.commit(); // TODO(seefeld): We don't retry writing this. Should we? - - // Ensure the cell is synced - await runtime.storageManager.synced(); + await authCell.runtime.editWithRetry((tx) => { + authCell.withTx(tx).set(emptyAuthData); + }); return emptyAuthData; } catch (error) { From b786ebcd976dd3fe99ca9b5409834c4d3e89eb12 Mon Sep 17 00:00:00 2001 From: Bernhard Seefeld Date: Fri, 5 Sep 2025 15:18:22 -0700 Subject: [PATCH 02/11] address copilot feedback --- packages/charm/src/manager.ts | 2 ++ packages/runner/src/runner.ts | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/charm/src/manager.ts b/packages/charm/src/manager.ts index a9485cc96..1a3fb0b1a 100644 --- a/packages/charm/src/manager.ts +++ b/packages/charm/src/manager.ts @@ -194,6 +194,8 @@ export class CharmManager { if (newPinnedCharms.length !== this.pinnedCharms.get().length) { this.pinnedCharms.withTx(tx).set(newPinnedCharms); changed = true; + } else { + changed = false; } }) && changed; } diff --git a/packages/runner/src/runner.ts b/packages/runner/src/runner.ts index 246844bab..ea74b4173 100644 --- a/packages/runner/src/runner.ts +++ b/packages/runner/src/runner.ts @@ -98,7 +98,6 @@ export class Runner implements IRunner { } else { return this.runtime.editWithRetry((tx) => { this.setupInternal(tx, recipeOrModule, argument, resultCell); - return resultCell; }).then(() => resultCell); } } From 212a3670dfdd7c70dccde8717ae41fc9d30a1bfc Mon Sep 17 00:00:00 2001 From: Bernhard Seefeld Date: Fri, 5 Sep 2025 15:32:02 -0700 Subject: [PATCH 03/11] add retry to built-ins --- .../runner/src/builtins/compile-and-run.ts | 59 +++++++---------- packages/runner/src/builtins/fetch-data.ts | 32 ++++----- packages/runner/src/builtins/llm.ts | 66 ++++++------------- packages/runner/src/builtins/stream-data.ts | 17 +++-- 4 files changed, 62 insertions(+), 112 deletions(-) diff --git a/packages/runner/src/builtins/compile-and-run.ts b/packages/runner/src/builtins/compile-and-run.ts index 5513fc6b7..231eebc88 100644 --- a/packages/runner/src/builtins/compile-and-run.ts +++ b/packages/runner/src/builtins/compile-and-run.ts @@ -161,55 +161,40 @@ export function compileAndRun( (err) => { if (thisRun !== currentRun) return; - // All this code runside outside the original action, and the - // transaction above might have closed by the time this is called. If - // so, we create a new one to set the error. - const status = tx.status(); - const asyncTx = status.status === "ready" ? tx : runtime.edit(); - - // Extract structured errors if this is a CompilerError - if (err instanceof CompilerError) { - const structuredErrors = err.errors.map((e) => ({ - line: e.line ?? 1, - column: e.column ?? 1, - message: e.message, - type: e.type, - file: e.file, - })); - errorsWithLog.withTx(asyncTx).set(structuredErrors); - } else { - errorWithLog.withTx(asyncTx).set( - err.message + (err.stack ? "\n" + err.stack : ""), - ); - } - if (asyncTx !== tx) asyncTx.commit(); + runtime.editWithRetry((asyncTx) => { + // Extract structured errors if this is a CompilerError + if (err instanceof CompilerError) { + const structuredErrors = err.errors.map((e) => ({ + line: e.line ?? 1, + column: e.column ?? 1, + message: e.message, + type: e.type, + file: e.file, + })); + errors.withTx(asyncTx).set(structuredErrors); + } else { + error.withTx(asyncTx).set( + err.message + (err.stack ? "\n" + err.stack : ""), + ); + } + }); }, ).finally(() => { if (thisRun !== currentRun) return; - // All this code runside outside the original action, and the - // transaction above might have closed by the time this is called. If - // so, we create a new one to set the status. - const status = tx.status(); - const asyncTx = status.status === "ready" ? tx : runtime.edit(); - pendingWithLog.withTx(asyncTx).set(false); - if (asyncTx !== tx) asyncTx.commit(); + runtime.editWithRetry((asyncTx) => { + pending.withTx(asyncTx).set(false); + }); }); - compilePromise.then(async (recipe) => { + compilePromise.then((recipe) => { if (thisRun !== currentRun) return; if (recipe) { // TODO(ja): to support editting of existing charms / running with // inputs from other charms, we will need to think more about // how we pass input into the builtin. - // All this code runside outside the original action, and the - // transaction above might have closed by the time this is called. If - // so, we create a new one to start the charm. - const status = tx.status(); - const asyncTx = status.status === "ready" ? tx : runtime.edit(); - await runtime.runSynced(result.withTx(asyncTx), recipe, input.get()); - if (asyncTx !== tx) asyncTx.commit(); + runtime.runSynced(result, recipe, input.get()); } // TODO(seefeld): Add capturing runtime errors. }); diff --git a/packages/runner/src/builtins/fetch-data.ts b/packages/runner/src/builtins/fetch-data.ts index 45a047cad..d4e36c548 100644 --- a/packages/runner/src/builtins/fetch-data.ts +++ b/packages/runner/src/builtins/fetch-data.ts @@ -127,38 +127,28 @@ export function fetchData( await runtime.idle(); - // All this code runside outside the original action, and the - // transaction above might have closed by the time this is called. If - // so, we create a new one to set the result. - const status = tx.status(); - const asyncTx = status.status === "ready" ? tx : runtime.edit(); - - pendingWithLog.withTx(asyncTx).set(false); - resultWithLog.withTx(asyncTx).set(data); - requestHashWithLog.withTx(asyncTx).set(hash); - - if (asyncTx !== tx) asyncTx.commit(); + await runtime.editWithRetry((tx) => { + pending.withTx(tx).set(false); + result.withTx(tx).set(data); + requestHash.withTx(tx).set(hash); + }); }) .catch(async (err) => { if (thisRun !== currentRun) return; await runtime.idle(); - // All this code runside outside the original action, and the - // transaction above might have closed by the time this is called. If - // so, we create a new one to set the error. - const status = tx.status(); - const asyncTx = status.status === "ready" ? tx : runtime.edit(); - - pendingWithLog.withTx(asyncTx).set(false); - errorWithLog.withTx(asyncTx).set(err); - - if (asyncTx !== tx) asyncTx.commit(); + await runtime.editWithRetry((tx) => { + pending.withTx(tx).set(false); + result.withTx(tx).set(undefined); + error.withTx(tx).set(err); + }); // TODO(seefeld): Not writing now, so we retry the request after failure. // Replace this with more fine-grained retry logic. // requestHash.setAtPath([], hash, log); }); + // Add our cancel to the cancel group addCancel(() => abort.abort()); }; diff --git a/packages/runner/src/builtins/llm.ts b/packages/runner/src/builtins/llm.ts index 2792f6a35..880ea683f 100644 --- a/packages/runner/src/builtins/llm.ts +++ b/packages/runner/src/builtins/llm.ts @@ -156,18 +156,12 @@ export function llm( //normalizeToCells(text, undefined, log); await runtime.idle(); - // All this code runside outside the original action, and the - // transaction above might have closed by the time this is called. If - // so, we create a new one to set the result. - const status = tx.status(); - const asyncTx = status.status === "ready" ? tx : runtime.edit(); - - pendingWithLog.withTx(asyncTx).set(false); - resultWithLog.withTx(asyncTx).set(text); - partialWithLog.withTx(asyncTx).set(text); - requestHashWithLog.withTx(asyncTx).set(hash); - - if (asyncTx !== tx) asyncTx.commit(); + await runtime.editWithRetry((tx) => { + pending.withTx(tx).set(false); + result.withTx(tx).set(text); + partial.withTx(tx).set(text); + requestHash.withTx(tx).set(hash); + }); }) .catch(async (error) => { if (thisRun !== currentRun) return; @@ -176,17 +170,11 @@ export function llm( await runtime.idle(); - // All this code runside outside the original action, and the - // transaction above might have closed by the time this is called. If - // so, we create a new one to set the result. - const status = tx.status(); - const asyncTx = status.status === "ready" ? tx : runtime.edit(); - - pendingWithLog.withTx(asyncTx).set(false); - resultWithLog.withTx(asyncTx).set(undefined); - partialWithLog.withTx(asyncTx).set(undefined); - - if (asyncTx !== tx) asyncTx.commit(); + await runtime.editWithRetry((tx) => { + pending.withTx(tx).set(false); + result.withTx(tx).set(undefined); + partial.withTx(tx).set(undefined); + }); // TODO(seefeld): Not writing now, so we retry the request after failure. // Replace this with more fine-grained retry logic. @@ -330,17 +318,11 @@ export function generateObject>( await runtime.idle(); - // All this code runside outside the original action, and the - // transaction above might have closed by the time this is called. If - // so, we create a new one to set the result. - const status = tx.status(); - const asyncTx = status.status === "ready" ? tx : runtime.edit(); - - pendingWithLog.withTx(asyncTx).set(false); - resultWithLog.withTx(asyncTx).set(response.object); - requestHashWithLog.withTx(asyncTx).set(hash); - - if (asyncTx !== tx) asyncTx.commit(); + await runtime.editWithRetry((tx) => { + pending.withTx(tx).set(false); + result.withTx(tx).set(response.object); + requestHash.withTx(tx).set(hash); + }); }) .catch(async (error) => { if (thisRun !== currentRun) return; @@ -349,17 +331,11 @@ export function generateObject>( await runtime.idle(); - // All this code runside outside the original action, and the - // transaction above might have closed by the time this is called. If - // so, we create a new one to set the result. - const status = tx.status(); - const asyncTx = status.status === "ready" ? tx : runtime.edit(); - - pendingWithLog.withTx(asyncTx).set(false); - resultWithLog.withTx(asyncTx).set({} as any); // FIXME(ja): setting result to undefined causes a storage conflict - partialWithLog.withTx(asyncTx).set(undefined); - - if (asyncTx !== tx) asyncTx.commit(); + await runtime.editWithRetry((tx) => { + pending.withTx(tx).set(false); + result.withTx(tx).set(undefined); + partial.withTx(tx).set(undefined); + }); // TODO(seefeld): Not writing now, so we retry the request after failure. // Replace this with more fine-grained retry logic. diff --git a/packages/runner/src/builtins/stream-data.ts b/packages/runner/src/builtins/stream-data.ts index 30a889675..9d77777ea 100644 --- a/packages/runner/src/builtins/stream-data.ts +++ b/packages/runner/src/builtins/stream-data.ts @@ -152,9 +152,9 @@ export function streamData( await runtime.idle(); - const asyncTx = runtime.edit(); - resultWithLog.withTx(asyncTx).set(parsedData); - await asyncTx.commit(); + await runtime.editWithRetry((tx) => { + result.withTx(tx).set(parsedData); + }); id = undefined; event = undefined; @@ -177,12 +177,11 @@ export function streamData( await runtime.idle(); - const status = tx.status(); - const asyncTx = status.status === "ready" ? tx : runtime.edit(); - pendingWithLog.withTx(asyncTx).set(false); - resultWithLog.withTx(asyncTx).set(undefined); - errorWithLog.withTx(asyncTx).set(e); - if (asyncTx !== tx) asyncTx.commit(); + await runtime.editWithRetry((tx) => { + pending.withTx(tx).set(false); + result.withTx(tx).set(undefined); + error.withTx(tx).set(e); + }); // Allow retrying the same request. previousCall = ""; From f2d06137e0a4d46bc86d40d0a764eac503219e01 Mon Sep 17 00:00:00 2001 From: Bernhard Seefeld Date: Fri, 5 Sep 2025 16:20:29 -0700 Subject: [PATCH 04/11] return error after retries all fail --- packages/charm/src/iterate.ts | 2 +- packages/charm/src/manager.ts | 14 +++++++------- packages/runner/src/cell.ts | 2 +- packages/runner/src/runner.ts | 10 +++++++++- packages/runner/src/runtime.ts | 11 ++++++----- packages/shell/src/lib/iframe-ctx.ts | 6 +++--- 6 files changed, 27 insertions(+), 18 deletions(-) diff --git a/packages/charm/src/iterate.ts b/packages/charm/src/iterate.ts index c9ec709a0..07a60cde2 100644 --- a/packages/charm/src/iterate.ts +++ b/packages/charm/src/iterate.ts @@ -202,7 +202,7 @@ export const generateNewRecipeVersion = async ( llmRequestId, ); - newCharm.runtime.editWithRetry((tx) => { + await newCharm.runtime.editWithRetry((tx) => { newCharm.withTx(tx).getSourceCell(charmSourceCellSchema)?.key("lineage") .push( { diff --git a/packages/charm/src/manager.ts b/packages/charm/src/manager.ts index 1a3fb0b1a..0aa468743 100644 --- a/packages/charm/src/manager.ts +++ b/packages/charm/src/manager.ts @@ -186,7 +186,7 @@ export class CharmManager { let changed = false; await this.syncCharms(this.pinnedCharms); - return await this.runtime.editWithRetry((tx) => { + return (!await this.runtime.editWithRetry((tx) => { const newPinnedCharms = filterOutEntity( this.pinnedCharms.withTx(tx), charmId, @@ -197,7 +197,7 @@ export class CharmManager { } else { changed = false; } - }) && changed; + })) && changed; } async unpin(charm: Cell | string | EntityId) { @@ -250,7 +250,7 @@ export class CharmManager { async emptyTrash() { await this.syncCharms(this.trashedCharms); - return await this.runtime.editWithRetry((tx) => { + await this.runtime.editWithRetry((tx) => { const trashedCharms = this.trashedCharms.withTx(tx); trashedCharms.set([]); }); @@ -809,7 +809,7 @@ export class CharmManager { await this.unpin(idOrCharm); - return await this.runtime.editWithRetry((tx) => { + return (!await this.runtime.editWithRetry((tx) => { // Find the charm in the main list const charm = this.charms.withTx(tx).get().find((c) => isSameEntity(c, id) @@ -833,7 +833,7 @@ export class CharmManager { } success = true; - }) && success; + })) && success; } // Permanently delete a charm (from trash or directly) @@ -845,7 +845,7 @@ export class CharmManager { const id = getEntityId(idOrCharm); if (!id) return false; - return await this.runtime.editWithRetry((tx) => { + return (!await this.runtime.editWithRetry((tx) => { // Remove from trash if present const newTrashedCharms = filterOutEntity( this.trashedCharms.withTx(tx), @@ -855,7 +855,7 @@ export class CharmManager { this.trashedCharms.withTx(tx).set(newTrashedCharms); success = true; } - }) && success; + })) && success; } async runPersistent( diff --git a/packages/runner/src/cell.ts b/packages/runner/src/cell.ts index a4e56a984..20f278a09 100644 --- a/packages/runner/src/cell.ts +++ b/packages/runner/src/cell.ts @@ -815,7 +815,7 @@ function subscribeToReferencedDocs( let cleanup: Cancel | undefined = callback(value); // Technically unnecessary since we don't expect/allow callbacks to sink to - // write to other cells, and we rety by design anyway below when read data + // write to other cells, and we retry by design anyway below when read data // changed. But ideally we enforce read-only as well. tx.commit(); diff --git a/packages/runner/src/runner.ts b/packages/runner/src/runner.ts index ea74b4173..01f47d451 100644 --- a/packages/runner/src/runner.ts +++ b/packages/runner/src/runner.ts @@ -96,6 +96,10 @@ export class Runner implements IRunner { this.setupInternal(providedTx, recipeOrModule, argument, resultCell); return Promise.resolve(resultCell); } else { + // Ignore errors after retrying for now, as outside the tx, we'll see the + // latest true value, it just lost the ract against someone else changing + // the recipe or argument. Correct action is anyhow similar to what would + // have happened if the write succeeded and was immediately overwritten. return this.runtime.editWithRetry((tx) => { this.setupInternal(tx, recipeOrModule, argument, resultCell); }).then(() => resultCell); @@ -477,7 +481,7 @@ export class Runner implements IRunner { resultCell.withTx(givenTx), ); } else { - await this.runtime.editWithRetry((tx) => { + const error = await this.runtime.editWithRetry((tx) => { setupRes = this.setupInternal( tx, recipe, @@ -485,6 +489,10 @@ export class Runner implements IRunner { resultCell.withTx(tx), ); }); + if (error) { + console.error("Error setting up recipe", error); + setupRes = undefined; + } } // If a new recipe was specified, make sure to sync any new cells diff --git a/packages/runner/src/runtime.ts b/packages/runner/src/runtime.ts index fce89e65b..25e69f14e 100644 --- a/packages/runner/src/runtime.ts +++ b/packages/runner/src/runtime.ts @@ -11,10 +11,10 @@ import type { RecipeEnvironment } from "./builder/env.ts"; import { ContextualFlowControl } from "./cfc.ts"; import { setRecipeEnvironment } from "./builder/env.ts"; import type { + CommitError, IExtendedStorageTransaction, IStorageManager, IStorageProvider, - IStorageSubscriptionCapability, MemorySpace, } from "./storage/interface.ts"; import { type Cell, createCell } from "./cell.ts"; @@ -109,7 +109,7 @@ export interface IRuntime { editWithRetry( fn: (tx: IExtendedStorageTransaction) => void, maxRetries?: number, - ): Promise; + ): Promise; readTx(tx?: IExtendedStorageTransaction): IExtendedStorageTransaction; // Cell factory methods @@ -436,17 +436,18 @@ export class Runtime implements IRuntime { editWithRetry( fn: (tx: IExtendedStorageTransaction) => void, maxRetries: number = DEFAULT_MAX_RETRIES, - ): Promise { + ): Promise { const tx = this.edit(); fn(tx); return tx.commit().then(({ error }) => { if (error) { if (maxRetries > 0) { return this.editWithRetry(fn, maxRetries - 1); + } else { + return error; } - return false; } - return true; + return undefined; }); } diff --git a/packages/shell/src/lib/iframe-ctx.ts b/packages/shell/src/lib/iframe-ctx.ts index 4960ef66a..11a197904 100644 --- a/packages/shell/src/lib/iframe-ctx.ts +++ b/packages/shell/src/lib/iframe-ctx.ts @@ -76,9 +76,9 @@ export const setupIframe = (runtime: Runtime) => ) { const tx = context.runtime.edit(); context.withTx(tx).key(key).set(value); - // No retry, since if we there is a conflict, the iframe will by the - // time this promise resolves have already gotten the base-line truth - // (In other words: It's correct to ignore this edit) + // No retry, since if there is a conflict, the iframe will by the time + // this promise resolves have already gotten the base-line truth (In + // other words: It's correct to ignore this edit) tx.commit(); } else { console.warn( From 957f0547b6008e7bc59b3de0dc5df9bd0d35c2e2 Mon Sep 17 00:00:00 2001 From: Bernhard Seefeld Date: Mon, 8 Sep 2025 09:45:10 -0700 Subject: [PATCH 05/11] compare length on pending tx (shouldn't make a difference here, since it's not modified, but cleaner) --- packages/charm/src/manager.ts | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/packages/charm/src/manager.ts b/packages/charm/src/manager.ts index 0aa468743..c350d6d53 100644 --- a/packages/charm/src/manager.ts +++ b/packages/charm/src/manager.ts @@ -187,11 +187,9 @@ export class CharmManager { await this.syncCharms(this.pinnedCharms); return (!await this.runtime.editWithRetry((tx) => { - const newPinnedCharms = filterOutEntity( - this.pinnedCharms.withTx(tx), - charmId, - ); - if (newPinnedCharms.length !== this.pinnedCharms.get().length) { + const pinnedCharms = this.pinnedCharms.withTx(tx); + const newPinnedCharms = filterOutEntity(pinnedCharms, charmId); + if (newPinnedCharms.length !== pinnedCharms.get().length) { this.pinnedCharms.withTx(tx).set(newPinnedCharms); changed = true; } else { From 7089d8b3d69567226e03104e77e610b9a21f354b Mon Sep 17 00:00:00 2001 From: Bernhard Seefeld Date: Mon, 8 Sep 2025 09:50:32 -0700 Subject: [PATCH 06/11] more correct length comparision & fixing returning errors --- packages/charm/src/manager.ts | 55 +++++++++++++++++------------------ 1 file changed, 26 insertions(+), 29 deletions(-) diff --git a/packages/charm/src/manager.ts b/packages/charm/src/manager.ts index c350d6d53..2590301f2 100644 --- a/packages/charm/src/manager.ts +++ b/packages/charm/src/manager.ts @@ -219,16 +219,14 @@ export class CharmManager { await this.syncCharms(this.trashedCharms); await this.syncCharms(this.charms); - let trashedCharm: Cell | undefined; - - this.runtime.editWithRetry((tx) => { + const error = await this.runtime.editWithRetry((tx) => { const trashedCharms = this.trashedCharms.withTx(tx); const id = getEntityId(idOrCharm); if (!id) return false; // Find the charm in trash - trashedCharm = trashedCharms.get().find((charm) => + const trashedCharm = trashedCharms.get().find((charm) => isSameEntity(charm, id) ); @@ -243,7 +241,8 @@ export class CharmManager { }); await this.runtime.idle(); - return true; + + return !error; } async emptyTrash() { @@ -808,35 +807,33 @@ export class CharmManager { await this.unpin(idOrCharm); return (!await this.runtime.editWithRetry((tx) => { + const charms = this.charms.withTx(tx); + const trashedCharms = this.trashedCharms.withTx(tx); + // Find the charm in the main list - const charm = this.charms.withTx(tx).get().find((c) => - isSameEntity(c, id) - ); + const charm = charms.get().find((c) => isSameEntity(c, id)); if (!charm) { success = false; - return; - } + } else { + // Move to trash if not already there + if (!trashedCharms.get().some((c) => isSameEntity(c, id))) { + trashedCharms.push(charm); + } - // Move to trash if not already there - if ( - !this.trashedCharms.withTx(tx).get().some((c) => isSameEntity(c, id)) - ) { - this.trashedCharms.withTx(tx).push(charm); - } + // Remove from main list + const newCharms = filterOutEntity(charms, id); + if (newCharms.length !== charms.get().length) { + charms.set(newCharms); + } - // Remove from main list - const newCharms = filterOutEntity(this.charms.withTx(tx), id); - if (newCharms.length !== this.charms.get().length) { - this.charms.withTx(tx).set(newCharms); + success = true; } - - success = true; })) && success; } // Permanently delete a charm (from trash or directly) async permanentlyDelete(idOrCharm: string | EntityId | Cell) { - let success = false; + let success; await this.syncCharms(this.trashedCharms); @@ -845,13 +842,13 @@ export class CharmManager { return (!await this.runtime.editWithRetry((tx) => { // Remove from trash if present - const newTrashedCharms = filterOutEntity( - this.trashedCharms.withTx(tx), - id, - ); - if (newTrashedCharms.length !== this.trashedCharms.get().length) { - this.trashedCharms.withTx(tx).set(newTrashedCharms); + const trashedCharms = this.trashedCharms.withTx(tx); + const newTrashedCharms = filterOutEntity(trashedCharms, id); + if (newTrashedCharms.length !== trashedCharms.get().length) { + trashedCharms.set(newTrashedCharms); success = true; + } else { + success = false; } })) && success; } From 95efb83fc1af93dd1ac3998c29e11c9f47ce799b Mon Sep 17 00:00:00 2001 From: Bernhard Seefeld Date: Mon, 8 Sep 2025 09:57:13 -0700 Subject: [PATCH 07/11] throw error if we can't persist the token --- .../integrations/google-oauth/google-oauth.utils.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/toolshed/routes/integrations/google-oauth/google-oauth.utils.ts b/packages/toolshed/routes/integrations/google-oauth/google-oauth.utils.ts index 7744f9f09..cb09e4c20 100644 --- a/packages/toolshed/routes/integrations/google-oauth/google-oauth.utils.ts +++ b/packages/toolshed/routes/integrations/google-oauth/google-oauth.utils.ts @@ -1,9 +1,11 @@ import { OAuth2Client, Tokens } from "@cmd-johnson/oauth2-client"; +import { getLogger } from "@commontools/utils/logger"; import env from "@/env.ts"; import { runtime } from "@/index.ts"; import { Context } from "@hono/hono"; import { AuthSchema, Mutable, Schema } from "@commontools/runner"; -// Types + +const logger = getLogger("google-oauth.utils"); export interface OAuth2Tokens { accessToken: string; @@ -186,12 +188,14 @@ export async function persistTokens( }; // Set the new tokens to the auth cell - await authCell.runtime.editWithRetry((tx) => { + const error = await authCell.runtime.editWithRetry((tx) => { authCell.withTx(tx).set(tokenData); }); + if (error) throw error; return tokenData; } catch (error) { + logger.error("Error persisting tokens", error); throw new Error(`Error persisting tokens: ${error}`); } } From 25d77ef80dc3bbe6c14ea88dbb75c722349c57ac Mon Sep 17 00:00:00 2001 From: Bernhard Seefeld Date: Mon, 8 Sep 2025 09:57:59 -0700 Subject: [PATCH 08/11] also here --- .../routes/integrations/google-oauth/google-oauth.utils.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/toolshed/routes/integrations/google-oauth/google-oauth.utils.ts b/packages/toolshed/routes/integrations/google-oauth/google-oauth.utils.ts index cb09e4c20..3f9f1586f 100644 --- a/packages/toolshed/routes/integrations/google-oauth/google-oauth.utils.ts +++ b/packages/toolshed/routes/integrations/google-oauth/google-oauth.utils.ts @@ -303,12 +303,14 @@ export async function clearAuthData(authCellDocLink: string) { }; // Set the empty data to the auth cell - await authCell.runtime.editWithRetry((tx) => { + const error = await authCell.runtime.editWithRetry((tx) => { authCell.withTx(tx).set(emptyAuthData); }); + if (error) throw error; return emptyAuthData; } catch (error) { + logger.error("Error clearing auth data", error); throw new Error(`Error clearing auth data: ${error}`); } } From 70229b4ba2881f3c237c1fd3a989bf877cb244b0 Mon Sep 17 00:00:00 2001 From: Bernhard Seefeld Date: Mon, 8 Sep 2025 10:57:53 -0700 Subject: [PATCH 09/11] fix test after changing editWithRetry signature --- .../runner/test/runtime-edit-with-retry.test.ts | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/packages/runner/test/runtime-edit-with-retry.test.ts b/packages/runner/test/runtime-edit-with-retry.test.ts index e0b587fa3..f16fb96e2 100644 --- a/packages/runner/test/runtime-edit-with-retry.test.ts +++ b/packages/runner/test/runtime-edit-with-retry.test.ts @@ -35,11 +35,11 @@ describe("Runtime.editWithRetry", () => { cell.set(0); await tx.commit(); - const ok = await runtime.editWithRetry((t) => { + const error = await runtime.editWithRetry((t) => { cell.withTx(t).send(1); }); - expect(ok).toBe(true); + expect(error).toBeUndefined(); expect(cell.get()).toBe(1); }); @@ -56,7 +56,7 @@ describe("Runtime.editWithRetry", () => { // Track attempts and force early aborts to trigger retry let attempts = 0; - const ok = await runtime.editWithRetry((t) => { + const error = await runtime.editWithRetry((t) => { attempts++; // Abort the first few attempts to force retry if (attempts <= 2) { @@ -66,7 +66,7 @@ describe("Runtime.editWithRetry", () => { cell.withTx(t).send(2); }, 5); - expect(ok).toBe(true); + expect(error).toBeUndefined(); expect(attempts).toBe(3); // initial + 2 retries expect(cell.get()).toBe(2); }); @@ -83,12 +83,12 @@ describe("Runtime.editWithRetry", () => { let attempts = 0; const max = 3; - const ok = await runtime.editWithRetry((t) => { + const error = await runtime.editWithRetry((t) => { attempts++; t.abort("always-fail"); }, max); - expect(ok).toBe(false); + expect(error).toBeDefined(); // initial + max retries expect(attempts).toBe(max + 1); // Value unchanged @@ -106,12 +106,12 @@ describe("Runtime.editWithRetry", () => { await tx.commit(); let attempts = 0; - const ok = await runtime.editWithRetry((t) => { + const error = await runtime.editWithRetry((t) => { attempts++; t.abort("always-fail"); }); - expect(ok).toBe(false); + expect(error).toBeDefined(); expect(attempts).toBe(DEFAULT_MAX_RETRIES + 1); expect(cell.get()).toBe(0); }); From 5d0e2425a158a06c3fb1fe0ddb06f2dadc99d545 Mon Sep 17 00:00:00 2001 From: Bernhard Seefeld Date: Mon, 8 Sep 2025 10:59:10 -0700 Subject: [PATCH 10/11] log errors via logger, not console --- packages/runner/src/runner.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/runner/src/runner.ts b/packages/runner/src/runner.ts index 01f47d451..8e7f74e8f 100644 --- a/packages/runner/src/runner.ts +++ b/packages/runner/src/runner.ts @@ -490,7 +490,7 @@ export class Runner implements IRunner { ); }); if (error) { - console.error("Error setting up recipe", error); + logger.error("Error setting up recipe", error); setupRes = undefined; } } From 4d918aa57780a865a4b2607c97ed0fface0ebd33 Mon Sep 17 00:00:00 2001 From: Bernhard Seefeld Date: Mon, 8 Sep 2025 11:43:47 -0700 Subject: [PATCH 11/11] fix wrong nesting of tx --- packages/runner/src/runner.ts | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/packages/runner/src/runner.ts b/packages/runner/src/runner.ts index 8e7f74e8f..97f0fc025 100644 --- a/packages/runner/src/runner.ts +++ b/packages/runner/src/runner.ts @@ -469,7 +469,6 @@ export class Runner implements IRunner { // scheduler if the transaction isn't committed before the first functions // run. Though most likely the worst case is just extra invocations. const givenTx = resultCell.tx?.status().status === "ready" && resultCell.tx; - const tx = givenTx || this.runtime.edit(); let setupRes: ReturnType | undefined; if (givenTx) { // If tx is given, i.e. result cell was part of a tx that is still open, @@ -497,11 +496,21 @@ export class Runner implements IRunner { // If a new recipe was specified, make sure to sync any new cells if (recipe || !synced) { - await this.syncCellsForRunningRecipe(resultCell.withTx(tx), recipe); + await this.syncCellsForRunningRecipe(resultCell, recipe); } if (setupRes?.needsStart) { + const tx = givenTx || this.runtime.edit(); this.startWithTx(tx, resultCell.withTx(tx), setupRes.recipe); + if (!givenTx) { + // Should be unnecessary as the start itself is read-only + // TODO(seefeld): Enforce this by adding a read-only flag for tx + await tx.commit().then((error) => { + if (error) { + logger.error("Error committing transaction", error); + } + }); + } } return recipe?.resultSchema