Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions packages/runner/src/builtins/compile-and-run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ export function compileAndRun(
// All this code runside outside the original action, and the
// transaction above might have closed by the time this is called. If
// so, we create a new one to set the error.
const asyncTx = tx.status().ok?.open ? tx : runtime.edit();
const status = tx.status();
const asyncTx = status.status === "ready" ? tx : runtime.edit();

// Extract structured errors if this is a CompilerError
if (err instanceof CompilerError) {
Expand All @@ -189,7 +190,8 @@ export function compileAndRun(
// All this code runside outside the original action, and the
// transaction above might have closed by the time this is called. If
// so, we create a new one to set the status.
const asyncTx = tx.status().ok?.open ? tx : runtime.edit();
const status = tx.status();
const asyncTx = status.status === "ready" ? tx : runtime.edit();
pendingWithLog.withTx(asyncTx).set(false);
if (asyncTx !== tx) asyncTx.commit();
});
Expand All @@ -204,7 +206,8 @@ export function compileAndRun(
// All this code runside outside the original action, and the
// transaction above might have closed by the time this is called. If
// so, we create a new one to start the charm.
const asyncTx = tx.status().ok?.open ? tx : runtime.edit();
const status = tx.status();
const asyncTx = status.status === "ready" ? tx : runtime.edit();
await runtime.runSynced(result.withTx(asyncTx), recipe, input.get());
if (asyncTx !== tx) asyncTx.commit();
}
Expand Down
6 changes: 4 additions & 2 deletions packages/runner/src/builtins/fetch-data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ export function fetchData(
// All this code runside outside the original action, and the
// transaction above might have closed by the time this is called. If
// so, we create a new one to set the result.
const asyncTx = tx.status().ok?.open ? tx : runtime.edit();
const status = tx.status();
const asyncTx = status.status === "ready" ? tx : runtime.edit();

pendingWithLog.withTx(asyncTx).set(false);
resultWithLog.withTx(asyncTx).set(data);
Expand All @@ -145,7 +146,8 @@ export function fetchData(
// All this code runside outside the original action, and the
// transaction above might have closed by the time this is called. If
// so, we create a new one to set the error.
const asyncTx = tx.status().ok?.open ? tx : runtime.edit();
const status = tx.status();
const asyncTx = status.status === "ready" ? tx : runtime.edit();

pendingWithLog.withTx(asyncTx).set(false);
errorWithLog.withTx(asyncTx).set(err);
Expand Down
12 changes: 8 additions & 4 deletions packages/runner/src/builtins/llm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ export function llm(
// All this code runside outside the original action, and the
// transaction above might have closed by the time this is called. If
// so, we create a new one to set the result.
const asyncTx = tx.status().ok?.open ? tx : runtime.edit();
const status = tx.status();
const asyncTx = status.status === "ready" ? tx : runtime.edit();

pendingWithLog.withTx(asyncTx).set(false);
resultWithLog.withTx(asyncTx).set(text);
Expand All @@ -177,7 +178,8 @@ export function llm(
// All this code runside outside the original action, and the
// transaction above might have closed by the time this is called. If
// so, we create a new one to set the result.
const asyncTx = tx.status().ok?.open ? tx : runtime.edit();
const status = tx.status();
const asyncTx = status.status === "ready" ? tx : runtime.edit();

pendingWithLog.withTx(asyncTx).set(false);
resultWithLog.withTx(asyncTx).set(undefined);
Expand Down Expand Up @@ -330,7 +332,8 @@ export function generateObject<T extends Record<string, unknown>>(
// All this code runside outside the original action, and the
// transaction above might have closed by the time this is called. If
// so, we create a new one to set the result.
const asyncTx = tx.status().ok?.open ? tx : runtime.edit();
const status = tx.status();
const asyncTx = status.status === "ready" ? tx : runtime.edit();

pendingWithLog.withTx(asyncTx).set(false);
resultWithLog.withTx(asyncTx).set(response.object);
Expand All @@ -348,7 +351,8 @@ export function generateObject<T extends Record<string, unknown>>(
// All this code runside outside the original action, and the
// transaction above might have closed by the time this is called. If
// so, we create a new one to set the result.
const asyncTx = tx.status().ok?.open ? tx : runtime.edit();
const status = tx.status();
const asyncTx = status.status === "ready" ? tx : runtime.edit();

pendingWithLog.withTx(asyncTx).set(false);
resultWithLog.withTx(asyncTx).set({} as any); // FIXME(ja): setting result to undefined causes a storage conflict
Expand Down
3 changes: 2 additions & 1 deletion packages/runner/src/builtins/stream-data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ export function streamData(

await runtime.idle();

const asyncTx = tx.status().ok?.open ? tx : runtime.edit();
const status = tx.status();
const asyncTx = status.status === "ready" ? tx : runtime.edit();
pendingWithLog.withTx(asyncTx).set(false);
resultWithLog.withTx(asyncTx).set(undefined);
errorWithLog.withTx(asyncTx).set(e);
Expand Down
3 changes: 2 additions & 1 deletion packages/runner/src/query-result-proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ export function createQueryResultProxy<T>(
}

// Resolve path and follow links to actual value.
const readTx = tx?.status().ok?.open ? tx : runtime.edit();
const txStatus = tx?.status();
const readTx = (txStatus?.status === "ready" && tx) ? tx : runtime.edit();
link = resolveLinkToValue(readTx, link);
const target = readTx.readValueOrThrow(link) as any;

Expand Down
2 changes: 1 addition & 1 deletion packages/runner/src/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ export class Runner implements IRunner {
// scheduler if the transaction isn't committed before the first functions
// run. Though most likely the worst case is just extra invocations.
this.run(
resultCell.tx?.status().ok?.open ? resultCell.tx : undefined,
resultCell.tx?.status().status === "ready" ? resultCell.tx : undefined,
recipe,
inputs,
resultCell,
Expand Down
18 changes: 11 additions & 7 deletions packages/runner/src/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -446,17 +446,21 @@ export function txToReactivityLog(
tx: IExtendedStorageTransaction,
): ReactivityLog {
const log: ReactivityLog = { reads: [], writes: [] };
for (const change of tx.log()) {
if ("read" in change) {
for (const activity of tx.journal.activity()) {
if ("read" in activity && activity.read) {
log.reads.push({
...change.read!.address,
path: change.read!.address.path.slice(1), // Remove the "value" prefix
space: activity.read.space,
id: activity.read.id,
type: activity.read.type,
path: activity.read.path.slice(1), // Remove the "value" prefix
});
}
if ("write" in change) {
if ("write" in activity && activity.write) {
log.writes.push({
...change.write!.address,
path: change.write!.address.path.slice(1),
space: activity.write.space,
id: activity.write.id,
type: activity.write.type,
path: activity.write.path.slice(1),
});
}
}
Expand Down
2 changes: 1 addition & 1 deletion packages/runner/src/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ export function validateAndTransform(
// from the tx. Once tx.commit() is called, all that data is either available
// via other transactions or has been rolled back. Either way, we want to
// reflect that reality.
if (!tx?.status().ok?.open) tx = undefined;
if (tx?.status().status !== "ready") tx = undefined;

// Reconstruct doc, path, schema, rootSchema from link and runtime
const schema = link.schema;
Expand Down
117 changes: 22 additions & 95 deletions packages/runner/src/storage/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,6 @@ export type Read = IAttestation;
*/
export type Write = IAttestation;

export interface IStorageTransactionInvariant {
read?: IStorageInvariant;
write?: IStorageInvariant;
}

export interface IStorageTransactionLog {
get(address: IMemorySpaceAddress): IStorageTransactionInvariant;
addRead(read: IStorageInvariant): void;
addWrite(write: IStorageInvariant): void;
[Symbol.iterator](): Iterator<IStorageTransactionInvariant>;
}

// This type is used to tag a document with any important metadata.
// Currently, the only supported type is the classification.
export type Labels = {
Expand Down Expand Up @@ -371,20 +359,15 @@ export interface IMemoryChange {
after: JSONValue | undefined;
}

export type IStorageTransactionProgress = Variant<{
open: IStorageTransactionLog;
pending: IStorageTransactionLog;
done: IStorageTransactionLog;
}>;
export type StorageTransactionStatus = Result<
IStorageTransactionState,
StorageTransactionFailed
>;

export type IStorageTransactionState =
export type StorageTransactionStatus =
| { status: "ready"; journal: ITransactionJournal }
| { status: "pending"; journal: ITransactionJournal }
| { status: "done"; journal: ITransactionJournal };
| { status: "done"; journal: ITransactionJournal }
| {
status: "error";
journal: ITransactionJournal;
error: StorageTransactionFailed;
};

/**
* Representation of a storage transaction, which can be used to query facts and
Expand All @@ -400,19 +383,22 @@ export type IStorageTransactionState =
*/
export interface IStorageTransaction {
/**
* Describes current status of the transaction. If transaction has failed
* or was cancelled result will be an error with a corresponding error variant.
* If transaction is being built it will have `open` status, if commit was
* called but promise has not resolved yet it will be `pending`. If commit
* successfully completed it will be `done`.
*
* Please note that if storage was updated since transaction was created such
* that any of the invariants have changed status will be change to
* `IStorageConsistencyError` even though transaction has not being commited.
* This allows transactor to cancel and recreate transaction with a current
* state without having to build up a whole transaction and commiting it.
* The transaction journal containing all read and write activities.
* Provides access to transaction operations and dependency tracking.
*/
readonly journal: ITransactionJournal;

/**
* Describes current status of the transaction. Returns a union type with
* status field indicating the current state:
* - `"ready"`: Transaction is being built and ready for operations
* - `"pending"`: Commit was called but promise has not resolved yet
* - `"done"`: Commit successfully completed
* - `"error"`: Transaction has failed or was cancelled, includes error details

* Each status variant includes a `journal` field with transaction operations.
*/
// status(): StorageTransactionStatus;
status(): StorageTransactionStatus;

/**
* Helper that is the same as `reader().read()` but more convenient, as it
Expand Down Expand Up @@ -495,21 +481,6 @@ export interface IStorageTransaction {
}

export interface IExtendedStorageTransaction extends IStorageTransaction {
/**
* Describes current status of the transaction. If transaction has failed
* or was cancelled result will be an error with a corresponding error variant.
* If transaction is being built it will have `open` status, if commit was
* called but promise has not resolved yet it will be `pending`. If commit
* successfully completed it will be `done`.
*
* Please note that if storage was updated since transaction was created such
* that any of the invariants have changed status will be change to
* `IStorageConsistencyError` even though transaction has not being commited.
* This allows transactor to cancel and recreate transaction with a current
* state without having to build up a whole transaction and commiting it.
*/
status(): Result<IStorageTransactionProgress, StorageTransactionFailed>;

/**
* Reads a value from a (local) memory address and throws on error, except for
* `NotFoundError` which is returned as undefined.
Expand Down Expand Up @@ -556,19 +527,6 @@ export interface IExtendedStorageTransaction extends IStorageTransaction {
value: JSONValue | undefined,
): void;

/**
* Returns the log of the transaction.
*
* The log is a list of changes that have been made to the transaction.
* It is used to track the dependencies of the transaction.
*
* If the transaction is aborted, the log reflects the attempted reads and
* writes. If the transaction is committed, the log reflects the actual reads
* and writes.
*
* @deprecated
*/
log(): IStorageTransactionLog;
}

export interface ITransactionReader {
Expand Down Expand Up @@ -842,32 +800,6 @@ export interface ITransactionJournal {

novelty(space: MemorySpace): Iterable<IAttestation>;
history(space: MemorySpace): Iterable<IAttestation>;

reader(
space: MemorySpace,
): Result<ITransactionReader, InactiveTransactionError>;

writer(
space: MemorySpace,
): Result<ITransactionWriter, InactiveTransactionError>;

/**
* Closes underlying transaction, making it non-editable going forward. Any
* attempts to edit it will fail.
*/
close(): Result<Map<MemorySpace, ITransaction>, InactiveTransactionError>;

/**
* Aborts underlying transaction, making it non-editable going forward. Any
* attempts to edit it will fail.
*/
abort(reason?: unknown): Result<Unit, InactiveTransactionError>;
}

export interface EditableJournal {
activity(): Iterable<Activity>;
novelty: Iterable<IAttestation>;
history(): Iterable<IAttestation>;
}

export interface ITransaction {
Expand Down Expand Up @@ -929,8 +861,3 @@ export interface IAttestation {
readonly address: IMemoryAddress;
readonly value?: JSONValue;
}

export interface IStorageInvariant {
readonly address: IMemorySpaceAddress;
readonly value?: JSONValue;
}
Loading