Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 158 additions & 79 deletions packages/runner/src/storage/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import type {
Commit,
ConflictError,
ConnectionError,
Entity,
Entity as URI,
JSONValue,
MemorySpace,
Reference,
Result,
SchemaContext,
State,
The,
The as MediaType,
TransactionError,
Unit,
Variant,
Expand Down Expand Up @@ -100,19 +100,13 @@ export interface IStorageProvider {
getReplica(): string | undefined;
}

/**
* This is successor to the current `IStorageProvider` which provides a
* transactional interface.
*/
export interface IStorageProviderV2 {
export interface IStorageManagerV2 {
/**
* Creates a new transaction that can be used to build up a change set that
* can be committed transactionally. It ensures that all reads are consistent
* and no affecting changes takes place until the transaction is committed. If
* upstream changes are made since transaction is created that updates any of
* the read values transaction will fail on commit.
* Creates a storage transaction that can be used to read / write data into
* locally replicated memory spaces. Transaction allows reading from many
* multiple spaces but writing only to one space.
*/
fork(): IStorageTransaction;
edit(): IStorageTransaction;
}

/**
Expand All @@ -122,38 +116,12 @@ export interface IStorageProviderV2 {
* lifetime by notifying pending transaction of every change that is integrated
* into the storage, if changes affect any data read through a transaction
* lifecycle it can not be committed because it would violate consistency. If
* no change occurs or changes do not affect any data read it would not affect
* transaction consistency guarantees and therefor committing transaction will
* be send to the upstream storage provider which will either accept if no
* invariants have being invalidated in the meantime or rejected and fail commit.
* no change occurs or changes do not affect any data reading it would not
* affect transaction consistency guarantees and therefor committing transaction
* will send it to an upstream storage provider which will either accept, if no
* invariants have being invalidated, or reject and fail commit.
*/
export interface IStorageTransaction {
/**
* Transaction can be cancelled which causes storage provider to stop keeping
* it up to date with incoming changes. Cancelled transactions will produce
* {@link IStorageTransactionAbortedIStorageTransactionAborted} on commit. Cancelling transaction
* may produce an error if transaction has already being committed. If reason
* is omitted `Unit` will be used.
*/
abort(reason?: Unit): Result<Unit, IStorageTransactionClosed>;

/**
* Commit the transaction. If the transaction has been aborted, this will
* produce `IStorageTransactionAborted`. If transaction has being
* invalidated while it was in progress, this will produce `IStorageConsistencyError`.
* If state has changed upstream `ConflictError` will be produced. If signing
* authority has no necessary permissions `UnauthorizedError` will be produced.
* If connection with remote can not be reastablished `ConnectionError` is
* produced. If remote can not perform transaction for any other reason like
* underlying DB problem `TransactionError` will be produced.
*
* Commiting failed transaction will have no effect and same return will be
* produced. This is not an ideal especially in the case of `ConnectionError`
* or `TransactionError`, however it is pragmatic choice allowing storage to
* drop transactions as opposed to keeping them around indefinitely.
*/
commit(): Promise<Result<Unit, IStorageTransactionError>>;

/**
* Describes current status of the transaction. If transaction has failed
* or was cancelled result will be an error with a corresponding error variant.
Expand All @@ -172,64 +140,109 @@ export interface IStorageTransaction {
IStorageTransactionError
>;

/**
* Creates a memory space reader for inside this transaction. Fails if
* transaction is no longer in progress. Requesting a reader for the same
* memory space will return same reader instance.
*/
reader(space: MemorySpace): Result<ITransactionReader, IReaderError>;

/**
* Creates a memory space writer for this transaction. Fails if transaction is
* no longer in progress or if writer for the different space was already open
* on this transaction. Requesting a writer for the same memory space will
* return same writer instance.
*/
writer(space: MemorySpace): Result<ITransactionWriter, IWriterError>;

/**
* Transaction can be cancelled which causes storage provider to stop keeping
* it up to date with incoming changes. Aborting inactive transactions will
* produce {@link InactiveTransactionError}. Aborted transactions will produce
* {@link IStorageTransactionAborted} error on attempt to commit.
*/
abort(reason?: Unit): Result<Unit, InactiveTransactionError>;

/**
* Commits transaction. If transaction is no longer active, this will
* produce {@link IStorageTransactionAborted}. If transaction consistency
* gurantees have being violated by upstream changes
* {@link IStorageTransactionInconsistent} is returned.
*
* If transaction is still active and no consistency guarantees have being
* invalidated it will be send upstream and status will be updated to
* `pending`. Transaction may still fail with {@link IStorageTransactionFailed}
* if state upstream affects values read from updated space have changed,
* which can happen if another client concurrently updates them. Transaction
* MAY also fail due to insufficient authorization level or due to various IO
* problems.
*
* Commit is idempotent, meaning calling it over and over will return same
* exact value as on first call and no execution will take place on subsequent
* calls.
*/
commit(): Promise<Result<Unit, IStorageTransactionError>>;
}

export interface ITransactionReader {
/**
* Reads a value from a (local) memory address and captures corresponding
* `Read` in the the transaction invariants. If value was written in read memory
* address in this transaction read will return value that was written as opposed
* to value stored.
* `Read` in the transaction invariants. If value was written in read memory
* address in this transaction read will return value that was written as
* opposed to value stored.
*
* Read will fail with `IStorageTransactionError` if transaction has an error state.
* Read will fail with `IStorageTransactionClosed` if transaction is done.
* Read will fail with `INotFoundError` record in the given address does not exist
* in (local) memory.
* Read will fail with `InactiveTransactionError` if transaction is no longer
* active.
*
* Read will fail with `INotFoundError` record in the given address does not exist
* but `Read` operation is still added to the transaction invariants as transactor
* assumes non existence of the record.
* Read will fail with `INotFoundError` when reading inside a memory address
* that does not exist in local replica. The `Read` invariant is still
* captured however to ensure that assumption about non existence is upheld.
*
* ```ts
* const w = tx.write({ the, of, at: [] }, {
* const w = tx.write({ type, id, path: [] }, {
* title: "Hello world",
* content: [
* { text: "Beautiful day", format: "bold" }
* ]
* })
* assert(w.ok)
*
* assert(tx.read({ the, of, at: ['author'] }).ok === undefined)
* assert(tx.read({ the, of, at: ['author', 'address'] }).error.name === 'NotFoundError')
* assert(tx.read({ type, id, path: ['author'] }).ok === undefined)
* assert(tx.read({ type, id, path: ['author', 'address'] }).error.name === 'NotFoundError')
* // JS specific getters are not supported
* assert(tx.read({ the, of, at: ['content', 'length'] }).ok.is === undefined)
* assert(tx.read({ the, of, at: ['title'] }).ok.is === "Hello world")
* assert(tx.read({ type, id, path: ['content', 'length'] }).ok.is === undefined)
* assert(tx.read({ type, id, path: ['title'] }).ok.is === "Hello world")
* // Referencing non-existing facts produces errors
* assert(tx.read({ the: 'bad/mime' , of, at: ['author'] }).error.name === 'NotFoundError')
* assert(tx.read({ type: 'bad/mime' , id, path: ['author'] }).error.name === 'NotFoundError')
* ```
*/
read(
address: IStorageAddress,
address: IMemoryAddress,
): Result<
Read,
INotFoundError | IStorageTransactionError | IStorageTransactionClosed
INotFoundError | InactiveTransactionError
>;
}

export interface ITransactionWriter extends ITransactionReader {
/**
* Write a value into a storage at a given address & captures it in the
* transaction invariants. Write will fail with `IStorageTransactionError`
* if transaction has an error state. Write will fail with
* `IStorageTransactionClosed` if transaction is done.
*/
write(
address: IStorageAddress,
address: IMemoryAddress,
value?: JSONValue,
): Result<Write, IStorageTransactionError | IStorageTransactionClosed>;
): Result<Write, InactiveTransactionError>;
}

/**
* This is transaction representation from the storage perpective. It will not
* be exposed outside of the storage provider intenals and is designed to allow
* storage provider to maintain consistency guarantees.
*/
export interface IStorageOpenTransaction {
export interface IStorageTransactionConsistencyMaintenance {
/**
* This is an internal method called by a storage provider that lets
* transaction know about potential invariant changes. Transaction can track
Expand All @@ -245,6 +258,7 @@ export interface IStorageOpenTransaction {
* aborted.
*/
export interface IStorageTransactionAborted extends Error {
name: "StorageTransactionAborted";
/**
* Reason provided when transaction was aborted.
*/
Expand All @@ -255,51 +269,116 @@ export interface IStorageTransactionAborted extends Error {
* Error indicates that transaction consistency guarantees have being
* invalidated - some fact has changed while transaction was in progress.
*/
export interface IStorageConsistencyError extends Error {}
export interface IStorageTransactionInconsistent extends Error {
name: "StorageTransactionInconsistent";
}

/**
* Error that indicating that no change could be made to a transaction is it is
* no longer active.
*/
export type InactiveTransactionError =
| IStorageTransactionInconsistent
| IStorageTransactionAborted
| IStorageTransactionFailed
| IStorageTransactionComplete;

export type IStorageTransactionError =
| IStorageTransactionAborted
| IStorageTransactionInconsistent
| IStorageTransactionFailed;

export type IStorageTransactionFailed =
| ConflictError
| TransactionError
| ConnectionError
| AuthorizationError;

export interface IStorageTransactionClosed extends Error {}
export type IReaderError =
| IStorageTransactionComplete
| IStorageTransactionAborted;

export type IWriterError =
| IStorageTransactionComplete
| IStorageTransactionAborted
| IStorageTransactionInconsistent
| IStorageTransactionWriteIsolationError;

export interface IStorageTransactionComplete extends Error {
name: "StorageTransactionCompleteError";
}
export interface INotFoundError extends Error {}
export type IStorageTransactionProgress = Variant<{
open: IStorageTransactionLog;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clarification: to entries move from open to pending to done?

if i want to know all, no matter the status (as scheduler needs it), should it iterate over all of them? who else is going to call this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idea has being that they all contain same transaction log, it's just transaction itself is either open, pending or done. In the implementation I renamed open to edit to make it more clear that transaction is editable while when pending or done it's no longer editable.

pending: IStorageTransactionLog;
done: IStorageTransactionLog;
}>;

export interface IStorageAddress {
the: The;
of: Entity;
at: string[];
/**
* Represents adddress within the memory space which is like pointer inside the
* fact value in the memory.
*/
export interface IMemoryAddress {
/**
* URI to an entitiy. It corresponds to `of` field in the memory protocol.
*/
id: URI;
/**
* Media type under which data is stored. It corresponds to `the` field in the
* memory protocol.
*/
type: MediaType;
/**
* Path to the {@link JSONValue} being reference by this address. It is path
* within the `is` field of the fact in memory protocol.
*/
path: MemoryAddressPathComponent[];
}

export type MemoryAddressPathComponent = string | number;

export interface IStorageTransactionLog
extends Iterable<IStorageTransactionInvariant> {
get(address: IStorageAddress): IStorageTransactionInvariant;
get(address: IMemoryAddress): IStorageTransactionInvariant;
}

export type IStorageTransactionInvariant = Variant<{
read: Read;
write: Write;
}>;

/**
* Error is returned on an attempt to open writer in a transaction that already
* has a writer for a different space.
*/
export interface IStorageTransactionWriteIsolationError extends Error {
name: "StorageTransactionWriteIsolationError";

/**
* Memory space writer that is already open.
*/
open: MemorySpace;

/**
* Memory space writer could not be opened for.
*/
requested: MemorySpace;
}

/**
* Describes read invariant of the underlaying transaction.
*/
export interface Read {
readonly the: The;
readonly of: Entity;
readonly at: string[];
readonly is?: JSONValue;
readonly address: IMemoryAddress;
readonly value?: JSONValue;
readonly cause: Reference;
}

/**
* Describes write invariant of the underlaying transaction.
*/
export interface Write {
readonly the: The;
readonly of: Entity;
readonly at: string[];
readonly is?: JSONValue;
readonly address: IMemoryAddress;
readonly value?: JSONValue;
readonly cause: Reference;
}