-
Notifications
You must be signed in to change notification settings - Fork 9
Add Storage Subscription Capability Interfaces #1386
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d21965a
8e8906b
9af87ee
cc1a594
620d5c2
2414542
d9704c1
35b12a7
98b7d36
d70c009
acd3b43
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -49,6 +49,8 @@ import type { | |
| IStorageManagerV2, | ||
| IStorageProvider, | ||
| IStorageProviderWithReplica, | ||
| IStorageSubscription, | ||
| IStorageSubscriptionCapability, | ||
| IStorageTransaction, | ||
| IStoreError, | ||
| ITransaction, | ||
|
|
@@ -62,6 +64,8 @@ export * from "@commontools/memory/interface"; | |
| import { Channel, RawCommand } from "./inspector.ts"; | ||
| import { SchemaNone } from "@commontools/memory/schema"; | ||
| import * as Transaction from "./transaction.ts"; | ||
| import * as SubscriptionManager from "./subscription.ts"; | ||
| import * as Differential from "./differential.ts"; | ||
|
|
||
| export type { Result, Unit }; | ||
| export interface Selector<Key> extends Iterable<Key> { | ||
|
|
@@ -430,6 +434,11 @@ export class Replica { | |
| * partially replicated into local cache. | ||
| */ | ||
| public remote: MemorySpaceSession, | ||
| /** | ||
| * Storage subscription that needs to be notified when state of the replica | ||
| * changes. | ||
| */ | ||
| public subscription: IStorageSubscription, | ||
| /** | ||
| * Represents persisted cache of the memory state that was fetched in one | ||
| * of the sessions. If IDB is not available in this runtime we do not have | ||
|
|
@@ -577,6 +586,7 @@ export class Replica { | |
| fetchedEntries = [...fetchedEntries, ...query.schemaFacts]; | ||
| } | ||
| const fetched = fetchedEntries.map(([fact, _schema]) => fact); | ||
| const changes = Differential.create().update(this, fetched); | ||
| this.heap.merge(fetched, Replica.put); | ||
|
|
||
| // Remote may not have all the requested entries. We denitrify them by | ||
|
|
@@ -604,6 +614,17 @@ export class Replica { | |
| // Add notFound entries to the heap and also persist them in the cache. | ||
| // Don't notify subscribers as if this were a server update. | ||
| this.heap.merge(notFound, Replica.put, (val) => false); | ||
| // Add entries for the facts we have not found we don't need to compare | ||
| // those as we already know we don't have them in the replica. | ||
| changes.set(notFound); | ||
|
|
||
| // Notify storage subscription about changes that were pulled from the remote | ||
| this.subscription.next({ | ||
| type: "pull", | ||
| space: this.did(), | ||
| changes: changes.close(), | ||
| }); | ||
|
|
||
| const result = await this.cache.merge(revisions.values(), Replica.put); | ||
|
|
||
| for (const [revision, schema] of fetchedEntries) { | ||
|
|
@@ -676,9 +697,18 @@ export class Replica { | |
| if (error) { | ||
| return { error }; | ||
| } else { | ||
| const values = [...pulled.values()]; | ||
| // If number of pulled records is less than what we requested we have some | ||
| // some records that we'll need to fetch. | ||
| this.heap.merge(pulled.values(), Replica.put); | ||
|
|
||
| // Notify storage subscribers that we have loaded some data. | ||
| this.subscription.next({ | ||
| type: "load", | ||
| space: this.did(), | ||
| changes: Differential.load(values), | ||
| }); | ||
|
|
||
| // If number of items pulled from cache is less than number of needed items | ||
| // we did not have everything we needed in cache, in which case we will | ||
| // have to wait until fetch is complete. | ||
|
|
@@ -769,10 +799,19 @@ export class Replica { | |
| } | ||
| } | ||
|
|
||
| async commit({ facts, claims }: ITransaction) { | ||
| async commit(transaction: ITransaction, source?: IStorageTransaction) { | ||
| const { facts, claims } = transaction; | ||
| const changes = Differential.create().update(this, facts); | ||
| // Store facts in a nursery so that subsequent changes will be build | ||
| // optimistically assuming that push will succeed. | ||
| this.nursery.merge(facts, Nursery.put); | ||
| // Notify storage subscribers about the committed changes. | ||
| this.subscription.next({ | ||
| type: "commit", | ||
| space: this.did(), | ||
| changes, | ||
| source, | ||
| }); | ||
| // Track all our pending changes | ||
| facts.map((fact) => | ||
| this.pendingNurseryChanges.add(toKey(fact), fact.cause.toString()) | ||
|
|
@@ -794,13 +833,27 @@ export class Replica { | |
| ); | ||
| this.seenNurseryChanges.delete(toKey(fact)); | ||
| } | ||
|
|
||
| // Checkout current state of facts so we can compute | ||
| // changes after we update underlying stores. | ||
| const checkout = Differential.checkout(this, facts); | ||
|
|
||
| this.nursery.merge(facts, Nursery.delete); | ||
| const fact = result.error.name === "ConflictError" && | ||
| result.error.conflict.actual; | ||
| // We also update heap so it holds latest record | ||
| if (fact) { | ||
| this.heap.merge([fact], Replica.update); | ||
| this.heap.merge([fact], Replica.put); | ||
| } | ||
|
|
||
| // Notify storage subscribers about the reverted transaction. | ||
| this.subscription.next({ | ||
| type: "revert", | ||
| space: this.did(), | ||
| changes: checkout.compare(this), | ||
| reason: result.error, | ||
| source, | ||
| }); | ||
| } // | ||
| // If transaction succeeded we promote facts from nursery into a heap. | ||
| else { | ||
|
|
@@ -815,50 +868,29 @@ export class Replica { | |
| // Avoid sending out updates to subscribers if it's a fact we already | ||
| // know about in the nursery. | ||
| const localFacts = this.getLocalFacts(revisions); | ||
| // Turn facts into revisions corresponding with the commit. | ||
| this.heap.merge( | ||
| revisions, | ||
| Replica.put, | ||
| (revision) => revision === undefined || !localFacts.has(revision), | ||
| ); | ||
|
|
||
| // We only delete from the nursery when we've seen all of our pending | ||
| // facts (or gotten a conflict). | ||
| // Server facts may have newer nursery changes that we want to keep. | ||
| const freshFacts = revisions.filter((revision) => | ||
| this.pendingNurseryChanges.get(toKey(revision))?.size ?? 0 === 0 | ||
| ); | ||
| this.nursery.merge(freshFacts, Nursery.delete); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor issue going from delete on freshFacts to evict on all facts here.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It will not because
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should probably have a test for this scenario though
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're right! Thanks
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This does still have an issue with two clients and rapid clicks. The evict version leads to a conflict storm where we go up and down in a loop, while the delete version stabilizes. |
||
| for (const fact of freshFacts) { | ||
| this.pendingNurseryChanges.delete(toKey(fact)); | ||
| this.seenNurseryChanges.delete(toKey(fact)); | ||
| } | ||
| } | ||
| // If transaction fails we delete facts from the nursery so that new | ||
| // changes will not build upon rejected state. If there are other inflight | ||
| // transactions that already were built upon our facts they will also fail. | ||
| if (result.error) { | ||
| this.nursery.merge(facts, Nursery.delete); | ||
| const fact = result.error.name === "ConflictError" && | ||
| result.error.conflict.actual; | ||
| // We also update heap so it holds latest record | ||
| if (fact) { | ||
| this.heap.merge([fact], Replica.update); | ||
| } | ||
| } // | ||
| // If transaction succeeded we promote facts from nursery into a heap. | ||
| else { | ||
| const commit = toRevision(result.ok); | ||
| const { since } = commit.is; | ||
| const revisions = [ | ||
| ...facts.map((fact) => ({ ...fact, since })), | ||
| // We strip transaction info so we don't duplicate same data | ||
| { ...commit, is: { since: commit.is.since } }, | ||
| ]; | ||
| // Turn facts into revisions corresponding with the commit. | ||
| this.heap.merge(revisions, Replica.put); | ||
|
|
||
| // Evict redundant facts which we just merged into `heap` so that reads | ||
| // will occur from `heap`. This way future changes upstream we not get | ||
| // shadowed by prior local changes. | ||
| this.nursery.merge(facts, Nursery.evict); | ||
|
|
||
| for (const fact of freshFacts) { | ||
| this.pendingNurseryChanges.delete(toKey(fact)); | ||
| this.seenNurseryChanges.delete(toKey(fact)); | ||
| } | ||
| } | ||
|
|
||
| return result; | ||
|
|
@@ -880,13 +912,20 @@ export class Replica { | |
| // should have the same since on the second, so we can clear them from | ||
| // tracking when we see them. | ||
| const resolvedFacts = this.getLocalFacts(revisions); | ||
| const checkout = Differential.checkout(this, revisions); | ||
| // We use put here instead of update, since we may have received new docs | ||
| // that we weren't already tracking. | ||
| this.heap.merge( | ||
| revisions, | ||
| Replica.put, | ||
| (state) => state === undefined || !resolvedFacts.has(state), | ||
| ); | ||
|
|
||
| this.subscription.next({ | ||
| type: "integrate", | ||
| space: this.did(), | ||
| changes: checkout.compare(this), | ||
| }); | ||
| return this.cache.merge(revisions, Replica.update); | ||
| } | ||
|
|
||
|
|
@@ -946,6 +985,11 @@ export class Replica { | |
| this.selectorTracker = new SelectorTracker(); | ||
| // Clear the pull queue | ||
| this.queue = new PullQueue(); | ||
|
|
||
| this.subscription.next({ | ||
| type: "reset", | ||
| space: this.did(), | ||
| }); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -970,6 +1014,7 @@ export interface RemoteStorageProviderSettings { | |
|
|
||
| export interface RemoteStorageProviderOptions { | ||
| session: Consumer.MemoryConsumer<MemorySpace>; | ||
| subscription: IStorageSubscription; | ||
| space: MemorySpace; | ||
| the?: string; | ||
| settings?: IRemoteStorageProviderSettings; | ||
|
|
@@ -1287,6 +1332,7 @@ export class Provider implements IStorageProvider { | |
| session: Consumer.MemoryConsumer<MemorySpace>; | ||
| spaces: Map<string, Replica>; | ||
| settings: IRemoteStorageProviderSettings; | ||
| subscription: IStorageSubscription; | ||
|
|
||
| subscribers: Map<string, Set<(value: StorageValue<JSONValue>) => void>> = | ||
| new Map(); | ||
|
|
@@ -1304,6 +1350,7 @@ export class Provider implements IStorageProvider { | |
|
|
||
| constructor({ | ||
| session, | ||
| subscription, | ||
| space, | ||
| the = "application/json", | ||
| settings = defaultSettings, | ||
|
|
@@ -1312,6 +1359,7 @@ export class Provider implements IStorageProvider { | |
| this.settings = settings; | ||
| this.session = session; | ||
| this.spaces = new Map(); | ||
| this.subscription = subscription; | ||
| this.workspace = this.mount(space); | ||
| } | ||
|
|
||
|
|
@@ -1333,7 +1381,12 @@ export class Provider implements IStorageProvider { | |
| } else { | ||
| const session = this.session.mount(space); | ||
| // FIXME(@ubik2): Disabling the cache while I ensure things work correctly | ||
| const replica = new Replica(space, session, new NoCache()); | ||
| const replica = new Replica( | ||
| space, | ||
| session, | ||
| this.subscription, | ||
| new NoCache(), | ||
| ); | ||
| replica.useSchemaQueries = this.settings.useSchemaQueries; | ||
| replica.poll(); | ||
| this.spaces.set(space, replica); | ||
|
|
@@ -1533,12 +1586,17 @@ export interface Options { | |
| settings?: IRemoteStorageProviderSettings; | ||
| } | ||
|
|
||
| export class StorageManager implements IStorageManager, IStorageManagerV2 { | ||
| export class StorageManager | ||
| implements | ||
| IStorageManager, | ||
| IStorageManagerV2, | ||
| IStorageSubscriptionCapability { | ||
| address: URL; | ||
| as: Signer; | ||
| id: string; | ||
| settings: IRemoteStorageProviderSettings; | ||
| #providers: Map<string, IStorageProviderWithReplica> = new Map(); | ||
| #subscription = SubscriptionManager.create(); | ||
|
|
||
| static open(options: Options) { | ||
| if (options.address.protocol === "memory:") { | ||
|
|
@@ -1582,6 +1640,7 @@ export class StorageManager implements IStorageManager, IStorageManagerV2 { | |
| space, | ||
| address, | ||
| settings, | ||
| subscription: this.#subscription, | ||
| session: Consumer.create({ as }), | ||
| }); | ||
| } | ||
|
|
@@ -1603,6 +1662,13 @@ export class StorageManager implements IStorageManager, IStorageManagerV2 { | |
| edit(): IStorageTransaction { | ||
| return Transaction.create(this); | ||
| } | ||
|
|
||
| /** | ||
| * Subscribes to changes in the storage. | ||
| */ | ||
| subscribe(subscription: IStorageSubscription): void { | ||
| this.#subscription.subscribe(subscription); | ||
| } | ||
| } | ||
|
|
||
| export const getChanges = ( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's unlikely that we were missing the fact in our heap, so update would usually have the same behavior, but this is more correct (put).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually encountered it in one of the added tests.