Skip to content

Commit 782da23

Browse files
authored
feat(runner): implement await storage.synced() for new TX system (#1503)
feat(runner): implement await storage.synced() for new TX system Add synced() method to wait for all pending syncs and commits to complete: - Add synced() to IStorageManager and IStorageProvider interfaces - Track commit promises in Replica to await pending transactions - Implement synced() in Provider to wait for both subscriptions and commits - Add getAllPromises() to SelectorTracker for subscription tracking - Update Storage.synced() to use new implementation when not in shim mode This ensures all pending operations complete before synced() resolves, including both document syncs and transaction commits.
1 parent 4b393d2 commit 782da23

File tree

6 files changed

+66
-1
lines changed

6 files changed

+66
-1
lines changed

packages/runner/src/storage.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,10 @@ export class Storage implements IStorage {
224224
}
225225

226226
async synced(): Promise<void> {
227+
if (!this.shim) {
228+
return this.storageManager.synced();
229+
}
230+
227231
await Promise.all([
228232
...this.loadingPromises.values(),
229233
...this.docToStoragePromises.values(),

packages/runner/src/storage/base.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ export abstract class BaseStorageProvider implements IStorageProvider {
2424
// TODO(@ubik2)
2525
//): Promise<Result<Selection<FactAddress, Revision<State>>, Error>>;
2626

27+
abstract synced(): Promise<void>;
28+
2729
abstract get<T = any>(uri: URI): StorageValue<T> | undefined;
2830

2931
sink<T = any>(uri: URI, callback: (value: StorageValue<T>) => void): Cancel {

packages/runner/src/storage/cache.ts

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,10 @@ class SelectorTracker<T = Result<Unit, Error>> {
378378
return this.selectorPromises.get(promiseKey);
379379
}
380380

381+
getAllPromises(): Iterable<Promise<T>> {
382+
return this.selectorPromises.values();
383+
}
384+
381385
/**
382386
* Return all tracked subscriptions as an array of {factAddress, selector} pairs
383387
*/
@@ -478,6 +482,9 @@ export class Replica {
478482
public queue: PullQueue = new PullQueue(),
479483
public pullRetryLimit: number = 100,
480484
public useSchemaQueries: boolean = false,
485+
// Promises for commits that are in flight
486+
public commitPromises: Set<Consumer.TransactionResult<MemorySpace>> =
487+
new Set(),
481488
// Track the selectors used for top level docs -- we only add to this
482489
// once we've gotten our results (so the promise is resolved).
483490
private selectorTracker: SelectorTracker = new SelectorTracker(),
@@ -834,9 +841,12 @@ export class Replica {
834841
);
835842

836843
// These push transaction that will commit desired state to a remote.
837-
const result = await this.remote.transact({
844+
const commitPromise = this.remote.transact({
838845
changes: getChanges([...claims, ...facts] as Statement[]),
839846
});
847+
this.commitPromises.add(commitPromise);
848+
const result = await commitPromise;
849+
this.commitPromises.delete(commitPromise);
840850

841851
// If transaction fails we delete facts from the nursery so that new
842852
// changes will not build upon rejected state. If there are other inflight
@@ -1332,6 +1342,10 @@ class ProviderConnection implements IStorageProvider {
13321342
return this.provider.sync(uri, expectedInStorage, schemaContext);
13331343
}
13341344

1345+
synced() {
1346+
return this.provider.synced();
1347+
}
1348+
13351349
get<T = any>(uri: URI): StorageValue<T> | undefined {
13361350
return this.provider.get(uri);
13371351
}
@@ -1470,6 +1484,13 @@ export class Provider implements IStorageProvider {
14701484
}
14711485
}
14721486

1487+
synced() {
1488+
return Promise.all([
1489+
Promise.all(this.serverSubscriptions.getAllPromises()),
1490+
Promise.all(this.workspace.commitPromises),
1491+
]) as unknown as Promise<void>;
1492+
}
1493+
14731494
get<T = any>(uri: URI): StorageValue<T> | undefined {
14741495
const entity = this.workspace.get({ the: this.the, of: uri });
14751496

@@ -1690,6 +1711,20 @@ export class StorageManager implements IStorageManager {
16901711
subscribe(subscription: IStorageSubscription): void {
16911712
this.#subscription.subscribe(subscription);
16921713
}
1714+
1715+
/**
1716+
* Wait for all pending syncs to complete + the microtask queue to flush, so
1717+
* that they are also all processed.
1718+
*
1719+
* @returns Promise that resolves when all pending syncs are complete.
1720+
*/
1721+
synced(): Promise<void> {
1722+
const { resolve, promise } = Promise.withResolvers<void>();
1723+
Promise.all(
1724+
this.#providers.values().map((provider) => provider.synced()),
1725+
).finally(() => setTimeout(() => resolve(), 0));
1726+
return promise;
1727+
}
16931728
}
16941729

16951730
export const getChanges = (

packages/runner/src/storage/interface.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,16 +74,25 @@ export interface StorageValue<T = any> {
7474

7575
export interface IStorageManager extends IStorageSubscriptionCapability {
7676
id: string;
77+
7778
/**
7879
* @deprecated
7980
*/
8081
open(space: MemorySpace): IStorageProviderWithReplica;
82+
8183
/**
8284
* Creates a storage transaction that can be used to read / write data into
8385
* locally replicated memory spaces. Transaction allows reading from many
8486
* multiple spaces but writing only to one space.
8587
*/
8688
edit(): IStorageTransaction;
89+
90+
/**
91+
* Wait for all pending syncs to complete.
92+
*
93+
* @returns Promise that resolves when all pending syncs are complete.
94+
*/
95+
synced(): Promise<void>;
8796
}
8897

8998
export interface IRemoteStorageProviderSettings {
@@ -137,6 +146,14 @@ export interface IStorageProvider {
137146
schemaContext?: SchemaContext,
138147
): Promise<Result<Unit, Error>>;
139148

149+
/**
150+
* Wait for all pending syncs to complete, that is all pending document syncs
151+
* and all pending commits.
152+
*
153+
* @returns Promise that resolves when all pending syncs are complete.
154+
*/
155+
synced(): Promise<void>;
156+
140157
/**
141158
* Get a value from the local cache reflecting storage. Call `sync()` first.
142159
*

packages/runner/test/push-conflict.test.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ describe.skip("Push conflict", () => {
3636
subscribe(_subscription: IStorageSubscription) {
3737
throw new Error("Not implemented");
3838
},
39+
synced() {
40+
return Promise.resolve();
41+
},
3942
};
4043

4144
beforeEach(() => {

packages/runner/test/transaction-notfound.test.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ class MockStorageManager implements IStorageManager {
5252
return Transaction.create(this);
5353
}
5454

55+
synced() {
56+
return Promise.resolve();
57+
}
58+
5559
subscribe() {}
5660
}
5761

0 commit comments

Comments
 (0)