Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/runner/src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ export class Storage implements IStorage {
}

async synced(): Promise<void> {
if (!this.shim) {
return this.storageManager.synced();
}

await Promise.all([
...this.loadingPromises.values(),
...this.docToStoragePromises.values(),
Expand Down
2 changes: 2 additions & 0 deletions packages/runner/src/storage/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ export abstract class BaseStorageProvider implements IStorageProvider {
// TODO(@ubik2)
//): Promise<Result<Selection<FactAddress, Revision<State>>, Error>>;

abstract synced(): Promise<void>;

abstract get<T = any>(uri: URI): StorageValue<T> | undefined;

sink<T = any>(uri: URI, callback: (value: StorageValue<T>) => void): Cancel {
Expand Down
37 changes: 36 additions & 1 deletion packages/runner/src/storage/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,10 @@ class SelectorTracker<T = Result<Unit, Error>> {
return this.selectorPromises.get(promiseKey);
}

getAllPromises(): Iterable<Promise<T>> {
return this.selectorPromises.values();
}

/**
* Return all tracked subscriptions as an array of {factAddress, selector} pairs
*/
Expand Down Expand Up @@ -478,6 +482,9 @@ export class Replica {
public queue: PullQueue = new PullQueue(),
public pullRetryLimit: number = 100,
public useSchemaQueries: boolean = false,
// Promises for commits that are in flight
public commitPromises: Set<Consumer.TransactionResult<MemorySpace>> =
new Set(),
// Track the selectors used for top level docs -- we only add to this
// once we've gotten our results (so the promise is resolved).
private selectorTracker: SelectorTracker = new SelectorTracker(),
Expand Down Expand Up @@ -834,9 +841,12 @@ export class Replica {
);

// These push transaction that will commit desired state to a remote.
const result = await this.remote.transact({
const commitPromise = this.remote.transact({
changes: getChanges([...claims, ...facts] as Statement[]),
});
this.commitPromises.add(commitPromise);
const result = await commitPromise;
this.commitPromises.delete(commitPromise);

// If transaction fails we delete facts from the nursery so that new
// changes will not build upon rejected state. If there are other inflight
Expand Down Expand Up @@ -1332,6 +1342,10 @@ class ProviderConnection implements IStorageProvider {
return this.provider.sync(uri, expectedInStorage, schemaContext);
}

synced() {
return this.provider.synced();
}

get<T = any>(uri: URI): StorageValue<T> | undefined {
return this.provider.get(uri);
}
Expand Down Expand Up @@ -1470,6 +1484,13 @@ export class Provider implements IStorageProvider {
}
}

synced() {
return Promise.all([
Promise.all(this.serverSubscriptions.getAllPromises()),
Promise.all(this.workspace.commitPromises),
]) as unknown as Promise<void>;
Copy link

Copilot AI Jul 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type assertion as unknown as Promise<void> is unsafe and obscures the actual return type. Consider using void in the Promise.all() call or explicitly handling the array result.

Suggested change
]) as unknown as Promise<void>;
]).then(() => {});

Copilot uses AI. Check for mistakes.
}

get<T = any>(uri: URI): StorageValue<T> | undefined {
const entity = this.workspace.get({ the: this.the, of: uri });

Expand Down Expand Up @@ -1690,6 +1711,20 @@ export class StorageManager implements IStorageManager {
subscribe(subscription: IStorageSubscription): void {
this.#subscription.subscribe(subscription);
}

/**
* Wait for all pending syncs to complete + the microtask queue to flush, so
* that they are also all processed.
*
* @returns Promise that resolves when all pending syncs are complete.
*/
synced(): Promise<void> {
const { resolve, promise } = Promise.withResolvers<void>();
Promise.all(
this.#providers.values().map((provider) => provider.synced()),
).finally(() => setTimeout(() => resolve(), 0));
return promise;
}
}

export const getChanges = (
Expand Down
17 changes: 17 additions & 0 deletions packages/runner/src/storage/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,25 @@ export interface StorageValue<T = any> {

export interface IStorageManager extends IStorageSubscriptionCapability {
id: string;

/**
* @deprecated
*/
open(space: MemorySpace): IStorageProviderWithReplica;

/**
* Creates a storage transaction that can be used to read / write data into
* locally replicated memory spaces. Transaction allows reading from many
* multiple spaces but writing only to one space.
*/
edit(): IStorageTransaction;

/**
* Wait for all pending syncs to complete.
*
* @returns Promise that resolves when all pending syncs are complete.
*/
synced(): Promise<void>;
}

export interface IRemoteStorageProviderSettings {
Expand Down Expand Up @@ -137,6 +146,14 @@ export interface IStorageProvider {
schemaContext?: SchemaContext,
): Promise<Result<Unit, Error>>;

/**
* Wait for all pending syncs to complete, that is all pending document syncs
* and all pending commits.
*
* @returns Promise that resolves when all pending syncs are complete.
*/
synced(): Promise<void>;

/**
* Get a value from the local cache reflecting storage. Call `sync()` first.
*
Expand Down
3 changes: 3 additions & 0 deletions packages/runner/test/push-conflict.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ describe.skip("Push conflict", () => {
subscribe(_subscription: IStorageSubscription) {
throw new Error("Not implemented");
},
synced() {
return Promise.resolve();
},
};

beforeEach(() => {
Expand Down
4 changes: 4 additions & 0 deletions packages/runner/test/transaction-notfound.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ class MockStorageManager implements IStorageManager {
return Transaction.create(this);
}

synced() {
return Promise.resolve();
}

subscribe() {}
}

Expand Down