Skip to content
Merged
11 changes: 11 additions & 0 deletions packages/runner/src/storage/cache.deno.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ import {
Provider,
StorageManager as BaseStorageManager,
} from "./cache.ts";
import * as StorageSubscription from "./subscription.ts";
import type { MemorySpace } from "@commontools/memory/interface";
import type { IStorageSubscription } from "./interface.ts";
export * from "./cache.ts";

export class StorageManagerEmulator extends BaseStorageManager {
#session?: Consumer.MemoryConsumer<MemorySpace>;
#subscription = StorageSubscription.create();

#providers: Map<string, Provider> = new Map();

Expand All @@ -27,12 +30,20 @@ export class StorageManagerEmulator extends BaseStorageManager {
return Provider.open({
space,
session: this.session(),
subscription: this.#subscription,
});
}

mount(space: MemorySpace) {
return this.session().mount(space);
}

/**
* Subscribes to changes in the storage.
*/
override subscribe(subscription: IStorageSubscription): void {
this.#subscription.subscribe(subscription);
}
}

export class StorageManager extends BaseStorageManager {
Expand Down
132 changes: 99 additions & 33 deletions packages/runner/src/storage/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ import type {
IStorageManagerV2,
IStorageProvider,
IStorageProviderWithReplica,
IStorageSubscription,
IStorageSubscriptionCapability,
IStorageTransaction,
IStoreError,
ITransaction,
Expand All @@ -62,6 +64,8 @@ export * from "@commontools/memory/interface";
import { Channel, RawCommand } from "./inspector.ts";
import { SchemaNone } from "@commontools/memory/schema";
import * as Transaction from "./transaction.ts";
import * as SubscriptionManager from "./subscription.ts";
import * as Differential from "./differential.ts";

export type { Result, Unit };
export interface Selector<Key> extends Iterable<Key> {
Expand Down Expand Up @@ -430,6 +434,11 @@ export class Replica {
* partially replicated into local cache.
*/
public remote: MemorySpaceSession,
/**
* Storage subscription that needs to be notified when state of the replica
* changes.
*/
public subscription: IStorageSubscription,
/**
* Represents persisted cache of the memory state that was fetched in one
* of the sessions. If IDB is not available in this runtime we do not have
Expand Down Expand Up @@ -577,6 +586,7 @@ export class Replica {
fetchedEntries = [...fetchedEntries, ...query.schemaFacts];
}
const fetched = fetchedEntries.map(([fact, _schema]) => fact);
const changes = Differential.create().update(this, fetched);
this.heap.merge(fetched, Replica.put);

// Remote may not have all the requested entries. We denitrify them by
Expand Down Expand Up @@ -604,6 +614,17 @@ export class Replica {
// Add notFound entries to the heap and also persist them in the cache.
// Don't notify subscribers as if this were a server update.
this.heap.merge(notFound, Replica.put, (val) => false);
// Add entries for the facts we have not found we don't need to compare
// those as we already know we don't have them in the replica.
changes.set(notFound);

// Notify storage subscription about changes that were pulled from the remote
this.subscription.next({
type: "pull",
space: this.did(),
changes: changes.close(),
});

const result = await this.cache.merge(revisions.values(), Replica.put);

for (const [revision, schema] of fetchedEntries) {
Expand Down Expand Up @@ -676,9 +697,18 @@ export class Replica {
if (error) {
return { error };
} else {
const values = [...pulled.values()];
// If number of pulled records is less than what we requested we have some
// some records that we'll need to fetch.
this.heap.merge(pulled.values(), Replica.put);

// Notify storage subscribers that we have loaded some data.
this.subscription.next({
type: "load",
space: this.did(),
changes: Differential.load(values),
});

// If number of items pulled from cache is less than number of needed items
// we did not have everything we needed in cache, in which case we will
// have to wait until fetch is complete.
Expand Down Expand Up @@ -769,10 +799,19 @@ export class Replica {
}
}

async commit({ facts, claims }: ITransaction) {
async commit(transaction: ITransaction, source?: IStorageTransaction) {
const { facts, claims } = transaction;
const changes = Differential.create().update(this, facts);
// Store facts in a nursery so that subsequent changes will be build
// optimistically assuming that push will succeed.
this.nursery.merge(facts, Nursery.put);
// Notify storage subscribers about the committed changes.
this.subscription.next({
type: "commit",
space: this.did(),
changes,
source,
});
// Track all our pending changes
facts.map((fact) =>
this.pendingNurseryChanges.add(toKey(fact), fact.cause.toString())
Expand All @@ -794,13 +833,27 @@ export class Replica {
);
this.seenNurseryChanges.delete(toKey(fact));
}

// Checkout current state of facts so we can compute
// changes after we update underlying stores.
const checkout = Differential.checkout(this, facts);

this.nursery.merge(facts, Nursery.delete);
const fact = result.error.name === "ConflictError" &&
result.error.conflict.actual;
// We also update heap so it holds latest record
if (fact) {
this.heap.merge([fact], Replica.update);
this.heap.merge([fact], Replica.put);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unlikely that we were missing the fact in our heap, so update would usually have the same behavior, but this is more correct (put).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually encountered it in one of the added tests.

}

// Notify storage subscribers about the reverted transaction.
this.subscription.next({
type: "revert",
space: this.did(),
changes: checkout.compare(this),
reason: result.error,
source,
});
} //
// If transaction succeeded we promote facts from nursery into a heap.
else {
Expand All @@ -815,50 +868,29 @@ export class Replica {
// Avoid sending out updates to subscribers if it's a fact we already
// know about in the nursery.
const localFacts = this.getLocalFacts(revisions);
// Turn facts into revisions corresponding with the commit.
this.heap.merge(
revisions,
Replica.put,
(revision) => revision === undefined || !localFacts.has(revision),
);

// We only delete from the nursery when we've seen all of our pending
// facts (or gotten a conflict).
// Server facts may have newer nursery changes that we want to keep.
const freshFacts = revisions.filter((revision) =>
this.pendingNurseryChanges.get(toKey(revision))?.size ?? 0 === 0
);
this.nursery.merge(freshFacts, Nursery.delete);
Copy link
Contributor

@ubik2 ubik2 Jul 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor issue going from delete on freshFacts to evict on all facts here.
Imagine a counter, where the user presses +,+,-

  • send 1 -- counter shows 1
  • recieve 1
  • send 2 -- counter shows 2
  • send 3 -- counter shows 3
  • send 2 -- counter shows 2
  • receive 2 (the first one)
    • evict: removes from nursery, since they are the same - counter still shows 2
    • freshFacts/delete: doesn't remove from nursery, since it's not fresh - counter still shows 2
  • receive 3
    • evict: thinks it's new - counter goes back to 3 (incorrect)
    • freshFacts/delete: doesn't remove from nursery, since it's not fresh - counter still shows 2
  • receive 2 (second one)
    • evict: thinks it's new - counter goes back to 2 (correct now)
    • freshFacts/delete: removes from nursery, since there's no longer any pending - counter still shows 2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

evict: removes from nursery, since they are the same - counter still shows 2

It will not because cause will be different so equality check will not pass.

Copy link
Contributor Author

@Gozala Gozala Jul 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably have a test for this scenario though

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right! Thanks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does still have an issue with two clients and rapid clicks. The evict version leads to a conflict storm where we go up and down in a loop, while the delete version stabilizes.
I'll investigate more to determine why.

for (const fact of freshFacts) {
this.pendingNurseryChanges.delete(toKey(fact));
this.seenNurseryChanges.delete(toKey(fact));
}
}
// If transaction fails we delete facts from the nursery so that new
// changes will not build upon rejected state. If there are other inflight
// transactions that already were built upon our facts they will also fail.
if (result.error) {
this.nursery.merge(facts, Nursery.delete);
const fact = result.error.name === "ConflictError" &&
result.error.conflict.actual;
// We also update heap so it holds latest record
if (fact) {
this.heap.merge([fact], Replica.update);
}
} //
// If transaction succeeded we promote facts from nursery into a heap.
else {
const commit = toRevision(result.ok);
const { since } = commit.is;
const revisions = [
...facts.map((fact) => ({ ...fact, since })),
// We strip transaction info so we don't duplicate same data
{ ...commit, is: { since: commit.is.since } },
];
// Turn facts into revisions corresponding with the commit.
this.heap.merge(revisions, Replica.put);

// Evict redundant facts which we just merged into `heap` so that reads
// will occur from `heap`. This way future changes upstream we not get
// shadowed by prior local changes.
this.nursery.merge(facts, Nursery.evict);

for (const fact of freshFacts) {
this.pendingNurseryChanges.delete(toKey(fact));
this.seenNurseryChanges.delete(toKey(fact));
}
}

return result;
Expand All @@ -880,13 +912,20 @@ export class Replica {
// should have the same since on the second, so we can clear them from
// tracking when we see them.
const resolvedFacts = this.getLocalFacts(revisions);
const checkout = Differential.checkout(this, revisions);
// We use put here instead of update, since we may have received new docs
// that we weren't already tracking.
this.heap.merge(
revisions,
Replica.put,
(state) => state === undefined || !resolvedFacts.has(state),
);

this.subscription.next({
type: "integrate",
space: this.did(),
changes: checkout.compare(this),
});
return this.cache.merge(revisions, Replica.update);
}

Expand Down Expand Up @@ -946,6 +985,11 @@ export class Replica {
this.selectorTracker = new SelectorTracker();
// Clear the pull queue
this.queue = new PullQueue();

this.subscription.next({
type: "reset",
space: this.did(),
});
}
}

Expand All @@ -970,6 +1014,7 @@ export interface RemoteStorageProviderSettings {

export interface RemoteStorageProviderOptions {
session: Consumer.MemoryConsumer<MemorySpace>;
subscription: IStorageSubscription;
space: MemorySpace;
the?: string;
settings?: IRemoteStorageProviderSettings;
Expand Down Expand Up @@ -1287,6 +1332,7 @@ export class Provider implements IStorageProvider {
session: Consumer.MemoryConsumer<MemorySpace>;
spaces: Map<string, Replica>;
settings: IRemoteStorageProviderSettings;
subscription: IStorageSubscription;

subscribers: Map<string, Set<(value: StorageValue<JSONValue>) => void>> =
new Map();
Expand All @@ -1304,6 +1350,7 @@ export class Provider implements IStorageProvider {

constructor({
session,
subscription,
space,
the = "application/json",
settings = defaultSettings,
Expand All @@ -1312,6 +1359,7 @@ export class Provider implements IStorageProvider {
this.settings = settings;
this.session = session;
this.spaces = new Map();
this.subscription = subscription;
this.workspace = this.mount(space);
}

Expand All @@ -1333,7 +1381,12 @@ export class Provider implements IStorageProvider {
} else {
const session = this.session.mount(space);
// FIXME(@ubik2): Disabling the cache while I ensure things work correctly
const replica = new Replica(space, session, new NoCache());
const replica = new Replica(
space,
session,
this.subscription,
new NoCache(),
);
replica.useSchemaQueries = this.settings.useSchemaQueries;
replica.poll();
this.spaces.set(space, replica);
Expand Down Expand Up @@ -1533,12 +1586,17 @@ export interface Options {
settings?: IRemoteStorageProviderSettings;
}

export class StorageManager implements IStorageManager, IStorageManagerV2 {
export class StorageManager
implements
IStorageManager,
IStorageManagerV2,
IStorageSubscriptionCapability {
address: URL;
as: Signer;
id: string;
settings: IRemoteStorageProviderSettings;
#providers: Map<string, IStorageProviderWithReplica> = new Map();
#subscription = SubscriptionManager.create();

static open(options: Options) {
if (options.address.protocol === "memory:") {
Expand Down Expand Up @@ -1582,6 +1640,7 @@ export class StorageManager implements IStorageManager, IStorageManagerV2 {
space,
address,
settings,
subscription: this.#subscription,
session: Consumer.create({ as }),
});
}
Expand All @@ -1603,6 +1662,13 @@ export class StorageManager implements IStorageManager, IStorageManagerV2 {
edit(): IStorageTransaction {
return Transaction.create(this);
}

/**
* Subscribes to changes in the storage.
*/
subscribe(subscription: IStorageSubscription): void {
this.#subscription.subscribe(subscription);
}
}

export const getChanges = (
Expand Down
Loading