From 3d0142f291334ca6746d853119cc15636ff99327 Mon Sep 17 00:00:00 2001 From: Robin McCollum Date: Mon, 21 Jul 2025 18:19:59 -0700 Subject: [PATCH 1/2] Minor tweaks - Use await instead of a promise chain in Storage.syncCell. - Use URI isntead of Entity, since we also define this in memory/interface.ts. --- packages/runner/src/storage.ts | 7 ++----- packages/runner/src/storage/cache.ts | 5 +++-- packages/runner/src/storage/interface.ts | 2 +- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/packages/runner/src/storage.ts b/packages/runner/src/storage.ts index bd6c46c53..81c0b7cda 100644 --- a/packages/runner/src/storage.ts +++ b/packages/runner/src/storage.ts @@ -175,11 +175,8 @@ export class Storage implements IStorage { if (!this.shim) { const { space, id } = cell.getAsNormalizedFullLink(); const storageProvider = this._getStorageProviderForSpace(space); - return storageProvider.sync( - id, - false, - schemaContext, - ).then(() => cell); + await storageProvider.sync(id, false, schemaContext); + return cell; } const doc = cell.getDoc(); diff --git a/packages/runner/src/storage/cache.ts b/packages/runner/src/storage/cache.ts index 2e1e5a1ce..374a3c0f5 100644 --- a/packages/runner/src/storage/cache.ts +++ b/packages/runner/src/storage/cache.ts @@ -56,6 +56,7 @@ import type { PushError, Retract, StorageValue, + URI, } from "./interface.ts"; import { BaseStorageProvider } from "./base.ts"; import * as IDB from "./idb.ts"; @@ -1322,7 +1323,7 @@ class ProviderConnection implements IStorageProvider { } sync( - entityId: EntityId | Entity, + entityId: EntityId | URI, expectedInStorage?: boolean, schemaContext?: SchemaContext, ) { @@ -1442,7 +1443,7 @@ export class Provider implements IStorageProvider { } sync( - entityId: EntityId | Entity, + entityId: EntityId | URI, expectedInStorage?: boolean, schemaContext?: SchemaContext, ) { diff --git a/packages/runner/src/storage/interface.ts b/packages/runner/src/storage/interface.ts index adb33a8fe..ac76568dd 100644 --- a/packages/runner/src/storage/interface.ts +++ b/packages/runner/src/storage/interface.ts @@ -5,7 +5,6 @@ import type { AuthorizationError as IAuthorizationError, ConflictError as IConflictError, ConnectionError as IConnectionError, - Entity as URI, Fact, FactAddress, Invariant as IClaim, @@ -20,6 +19,7 @@ import type { The as MediaType, TransactionError, Unit, + URI, Variant, } from "@commontools/memory/interface"; From 2d902bdcace55fd4417ae4aed6f86fe211f3ba3f Mon Sep 17 00:00:00 2001 From: Robin McCollum Date: Tue, 22 Jul 2025 00:07:14 -0700 Subject: [PATCH 2/2] Change IStorageProvider API to be URI based instead of EntityId as well as the BaseStorageProvider implementation and the Storage class. Moved BaseStorageProvider.toEntity into Storage.toURI, since this is the only place it's needed. Changed IStorageProvider to use URI instead of EntityId, as well as the ProviderConnection and Provider classes. Updated tests to use cell.getAsNormalizedFullLink to get the URI instead of EntityId. --- .../runner/integration/reconnection.test.ts | 18 +-- packages/runner/src/doc-map.ts | 7 +- packages/runner/src/storage.ts | 132 +++++++++--------- packages/runner/src/storage/base.ts | 49 ++----- packages/runner/src/storage/cache.ts | 38 +++-- packages/runner/src/storage/interface.ts | 19 ++- packages/runner/src/storage/query.ts | 2 +- packages/runner/test/pending-nursery.test.ts | 5 +- .../runner/test/provider-reconnection.test.ts | 15 +- packages/runner/test/push-conflict.test.ts | 47 +++---- packages/runner/test/storage.test.ts | 33 +++-- 11 files changed, 165 insertions(+), 200 deletions(-) diff --git a/packages/runner/integration/reconnection.test.ts b/packages/runner/integration/reconnection.test.ts index f42ccd589..aa516e2cf 100644 --- a/packages/runner/integration/reconnection.test.ts +++ b/packages/runner/integration/reconnection.test.ts @@ -7,7 +7,7 @@ import { Identity } from "@commontools/identity"; import { StorageManager } from "../src/storage/cache.ts"; -import type { SchemaContext } from "@commontools/memory/interface"; +import type { SchemaContext, URI } from "@commontools/memory/interface"; const TOOLSHED_URL = Deno.env.get("TOOLSHED_API_URL") || "http://localhost:8000"; @@ -72,16 +72,16 @@ const storageManager3 = StorageManager.open({ }); const provider3 = storageManager3.open(signer.did()); console.log(`Provider3 (control) connected to memory server`); - +const uri: URI = `of:${TEST_DOC_ID}`; // Listen for updates on the test-reconnection-counter document // Note: this is not the schema subscription, its just a client-side listener -provider1.sink({ "/": TEST_DOC_ID }, (value) => { +provider1.sink(uri, (value) => { updateCount1++; updates1.push(value.value); console.log(`Provider1 Update #${updateCount1}:`, value.value); }); -provider3.sink({ "/": TEST_DOC_ID }, (value) => { +provider3.sink(uri, (value) => { updateCount3++; updates3.push(value.value); console.log(`Provider3 Update #${updateCount3}:`, value.value); @@ -89,13 +89,13 @@ provider3.sink({ "/": TEST_DOC_ID }, (value) => { // Establish server-side subscription with schema console.log("Establishing subscriptions..."); -await provider1.sync({ "/": TEST_DOC_ID }, true, testSchema); -await provider3.sync({ "/": TEST_DOC_ID }, true, testSchema); +await provider1.sync(uri, true, testSchema); +await provider3.sync(uri, true, testSchema); // Send initial value to server console.log("Sending initial value..."); await provider1.send([{ - entityId: { "/": TEST_DOC_ID }, + uri, value: { value: { value: 1, @@ -155,7 +155,7 @@ console.log(`Connected to memory server as second client`); // Establish server-side subscription with schema console.log("Establishing subscription as second client..."); -await provider2.sync({ "/": TEST_DOC_ID }, true, testSchema); +await provider2.sync(uri, true, testSchema); // Send test updates and check if subscription still works console.log("Sending test updates after disconnection..."); @@ -164,7 +164,7 @@ const intervalId = setInterval(async () => { try { // Send an update as the second const result = await provider2.send([{ - entityId: { "/": TEST_DOC_ID }, + uri, value: { value: { value: testValue++, diff --git a/packages/runner/src/doc-map.ts b/packages/runner/src/doc-map.ts index b44a56879..af87903a6 100644 --- a/packages/runner/src/doc-map.ts +++ b/packages/runner/src/doc-map.ts @@ -1,5 +1,6 @@ import { refer } from "merkle-reference/json"; import { isRecord } from "@commontools/utils/types"; +import { URI } from "@commontools/memory/interface"; import { isOpaqueRef } from "./builder/types.ts"; import { createDoc, type DocImpl, isDoc } from "./doc.ts"; import { @@ -118,19 +119,19 @@ export class DocumentMap implements IDocumentMap { getDocByEntityId( space: MemorySpace, - entityId: EntityId | string, + entityId: EntityId | URI, createIfNotFound?: true, sourceIfCreated?: DocImpl, ): DocImpl; getDocByEntityId( space: MemorySpace, - entityId: EntityId | string, + entityId: EntityId | URI, createIfNotFound: false, sourceIfCreated?: DocImpl, ): DocImpl | undefined; getDocByEntityId( space: MemorySpace, - entityId: EntityId | string, + entityId: EntityId | URI, createIfNotFound = true, sourceIfCreated?: DocImpl, ): DocImpl | undefined { diff --git a/packages/runner/src/storage.ts b/packages/runner/src/storage.ts index 81c0b7cda..2704836f5 100644 --- a/packages/runner/src/storage.ts +++ b/packages/runner/src/storage.ts @@ -7,9 +7,8 @@ import type { SchemaPathSelector, } from "@commontools/memory/interface"; import { type AddCancel, type Cancel, useCancelGroup } from "./cancel.ts"; -import { Cell, isCell, isStream } from "./cell.ts"; +import { type Cell, isCell } from "./cell.ts"; import { type DocImpl, isDoc } from "./doc.ts"; -import { type EntityId, entityIdStr } from "./doc-map.ts"; import type { IExtendedStorageTransaction, IStorageManager, @@ -18,6 +17,7 @@ import type { Labels, StorageNotification, StorageValue, + URI, } from "./storage/interface.ts"; import { log } from "./log.ts"; import type { IRuntime, IStorage } from "./runtime.ts"; @@ -31,9 +31,8 @@ import { isLink } from "./link-utils.ts"; import { ExtendedStorageTransaction, ShimStorageManager, - uriToEntityId, } from "./storage/transaction-shim.ts"; -import { toURI } from "./uri-utils.ts"; +import type { EntityId } from "./doc-map.ts"; export type { Labels, MemorySpace }; /** @@ -78,8 +77,9 @@ export class Storage implements IStorage { // Resolves for the promises above. private loadingResolves = new Map) => void>(); - // We'll also keep track of the subscriptions for the docs - // These don't care about schema, and use the id from the entity id + // We'll also keep track of the subscriptions for the docs. + // These don't care about schema, and use the combination of the space and + // uri as the key. private storageToDocSubs = new Map(); private docToStorageSubs = new Map(); @@ -205,12 +205,9 @@ export class Storage implements IStorage { // Start loading the doc and save the promise so we don't have more than one // caller loading this doc. const storageProvider = this._getStorageProviderForSpace(doc.space); - const result = await storageProvider.sync( - doc.entityId!, - false, - schemaContext, - ); + const uri = Storage.toURI(doc.entityId); + const result = await storageProvider.sync(uri, false, schemaContext); if (result.error) { // This will be a decoupled doc that is not persisted and cannot be edited doc.ephemeral = true; @@ -271,8 +268,8 @@ export class Storage implements IStorage { schemaContext: schemaContext, }; const selectorRef = refer(JSON.stringify(selector)).toString(); - const docId = entityIdStr(doc.entityId); - return `${doc.space}/${docId}/application/json:${selectorRef}`; + const docId = Storage.toURI(doc.entityId); + return `${doc.space}/${docId}/application/json?${selectorRef}`; } // After attempting to load the relevant documents from storage, we can @@ -281,7 +278,7 @@ export class Storage implements IStorage { doc: DocImpl, storageProvider: IStorageProvider, schemaContext?: SchemaContext, - ): Promise { + ): Promise { // Don't update docs while they might be updating. await this.runtime.scheduler.idle(); // Don't update docs while we have pending writes to storage @@ -289,9 +286,10 @@ export class Storage implements IStorage { // Run a schema query against our local content, so we can either send // the set of linked docs, or load them. + const uri = Storage.toURI(doc.entityId); const { missing, loaded, selected } = this._queryLocal( doc.space, - doc.entityId, + uri, storageProvider, schemaContext, ); @@ -299,18 +297,18 @@ export class Storage implements IStorage { // per-space transaction entry, and may have labels. // We'll also ensure we're only dealing with the entity ids that will // be managed in our document map. - const entityIds = loaded.values().map((valueEntry) => valueEntry.source) + const uris = loaded.values().map((valueEntry) => valueEntry.source) .filter((docAddr) => docAddr.the === "application/json" && docAddr.of.startsWith("of:") - ).map((docAddr) => uriToEntityId(docAddr.of)).toArray(); + ).map((docAddr) => docAddr.of).toArray(); // It's ok to be missing the primary record (this is the case when we are // creating it for the first time). if ( missing.length === 1 && - missing[0].of === `of:${entityIdStr(doc.entityId)}` + missing[0].of === uri ) { - entityIds.push(uriToEntityId(missing[0].of)); + uris.push(missing[0].of); // } else if (missing.length > 1) { // console.debug("missing", missing); } @@ -319,14 +317,11 @@ export class Storage implements IStorage { // First, make sure we have all these docs in the runtime document map // This should also handle the source docs, since they will be included // in our query result. - for (const entityId of entityIds) { - docMap.getDocByEntityId(doc.space, entityId, true); + for (const uri of uris) { + docMap.getDocByEntityId(doc.space, uri, true); } // Any objects that aren't on the server may need to be sent there. - const valuesToSend: { - entityId: EntityId; - value: StorageValue; - }[] = []; + const valuesToSend: { uri: URI; value: StorageValue }[] = []; // Now make another pass. At this point, we can leave any docs that aren't // in the DocumentMap alone, but set the cell to the DocImpl for the ones // that are present. @@ -334,16 +329,16 @@ export class Storage implements IStorage { // does, but I don't think we need to do this anymore. // I use the storage provider to get the nursery version if it's more recent. // This makes it so if I have local changes, they aren't lost. - for (const entityId of entityIds) { - const storageValue = storageProvider.get(entityId); - const newDoc = docMap.getDocByEntityId(doc.space, entityId, false)!; + for (const uri of uris) { + const storageValue = storageProvider.get(uri); + const newDoc = docMap.getDocByEntityId(doc.space, uri, false)!; // We don't need to hook up ephemeral docs if (newDoc.ephemeral) { console.log( "Found link to ephemeral doc", - entityIdStr(newDoc.entityId), + uri, "from", - entityIdStr(doc.entityId), + Storage.toURI(doc.entityId), ); continue; } @@ -379,7 +374,7 @@ export class Storage implements IStorage { // The object doesn't exist in storage, but it does in the doc map // TODO(@ubik2): investigate labels // Copy the value in doc for storage, and add to the set of writes - valuesToSend.push({ entityId: entityId, value: docValue }); + valuesToSend.push({ uri: uri, value: docValue }); } // Any updates to these docs should be sent to storage, and any update @@ -394,7 +389,7 @@ export class Storage implements IStorage { return []; } } - return entityIds; + return uris; } // We need to call this for all the docs that will be part of this transaction // This goes through the document and converts the DocImpls in CellLink objects @@ -449,7 +444,7 @@ export class Storage implements IStorage { // full traversal when we don't have a schema. private _queryLocal( space: MemorySpace, - entityId: EntityId, + uri: URI, storageProvider: IStorageProvider, schemaContext: SchemaContext = { schema: true, rootSchema: true }, ) { @@ -459,8 +454,7 @@ export class Storage implements IStorage { this.runtime.documentMap, Storage._cellLinkToJSON, ); - const idString = entityIdStr(entityId); - const docAddress = manager.toAddress(idString); + const docAddress = { of: uri, the: "application/json" }; const selector = { path: [], schemaContext: schemaContext }; return querySchema(selector, [], docAddress, manager); } @@ -478,12 +472,11 @@ export class Storage implements IStorage { JSON.stringify(value), ], ); + const uri = Storage.toURI(doc.entityId); const storageValue = Storage._cellLinkToJSON(doc, labels); const existingValue = this._getStorageProviderForSpace(doc.space).get< JSONValue - >( - doc.entityId, - ); + >(uri); // If our value is the same as what storage has, we don't need to do anything. if (deepEqual(storageValue, existingValue)) { return; @@ -492,11 +485,7 @@ export class Storage implements IStorage { // Track these promises for our synced call. // We may have linked docs that storage doesn't know about const storageProvider = this._getStorageProviderForSpace(doc.space); - const { missing } = this._queryLocal( - doc.space, - doc.entityId, - storageProvider, - ); + const { missing } = this._queryLocal(doc.space, uri, storageProvider); // Any missing docs need to be linked up for (const factAddress of missing) { // missing docs have been created in our doc map, but storage doesn't @@ -504,7 +493,7 @@ export class Storage implements IStorage { // TODO(@ubik2) I've lost the schema here const linkedDoc = this.runtime.documentMap.getDocByEntityId( doc.space, - uriToEntityId(factAddress.of), + factAddress.of, ); // we don't need to await this, since by the time we've resolved our // docToStoragePromise, we'll have added the loadingPromise. @@ -512,7 +501,7 @@ export class Storage implements IStorage { } // If we're already dirty, we don't need to add a promise - const docKey = `${doc.space}/${toURI(doc.entityId)}`; + const docKey = `${doc.space}/${uri}`; if (this.dirtyDocs.has(docKey)) { return; } @@ -536,17 +525,16 @@ export class Storage implements IStorage { await this.runtime.idle(); } - this.dirtyDocs.delete(`${doc.space}/${toURI(doc.entityId)}`); + const uri = Storage.toURI(doc.entityId); + const docKey = `${doc.space}/${uri}`; + this.dirtyDocs.delete(docKey); const storageProvider = this._getStorageProviderForSpace(doc.space); // Create storage value using the helper to ensure consistency const storageValue = Storage._cellLinkToJSON(doc, labels); - await storageProvider.send([{ - entityId: doc.entityId, - value: storageValue, - }]); + await storageProvider.send([{ uri: uri, value: storageValue }]); } // Update the doc with the new value we got in storage. @@ -554,8 +542,9 @@ export class Storage implements IStorage { doc: DocImpl, storageValue: StorageValue, ) { + const uri = Storage.toURI(doc.entityId); // Mark this doc as being processed - const docKey = `${doc.space}/${toURI(doc.entityId)}`; + const docKey = `${doc.space}/${uri}`; this.dirtyDocs.add(docKey); // Increment the counter at the start @@ -614,20 +603,21 @@ export class Storage implements IStorage { private _subscribeToChanges(doc: DocImpl): void { log(() => ["subscribe to changes", JSON.stringify(doc.entityId)]); - const docId = entityIdStr(doc.entityId); + const uri = Storage.toURI(doc.entityId); + const docKey = `${doc.space}/${uri}`; // Clear any existing subscriptions first - we only want one callback // and if we call syncCell multiple times, we would end up // with multiple subscriptions. - if (this.docToStorageSubs.has(docId)) { + if (this.docToStorageSubs.has(docKey)) { // Cancel any existing subscription - this.docToStorageSubs.get(docId)?.(); - this.docToStorageSubs.delete(docId); + this.docToStorageSubs.get(docKey)?.(); + this.docToStorageSubs.delete(docKey); } - if (this.storageToDocSubs.has(docId)) { + if (this.storageToDocSubs.has(docKey)) { // Cancel any existing subscription - this.storageToDocSubs.get(docId)?.(); - this.storageToDocSubs.delete(docId); + this.storageToDocSubs.get(docKey)?.(); + this.storageToDocSubs.delete(docKey); } // Subscribe to doc changes, send updates to storage @@ -635,17 +625,31 @@ export class Storage implements IStorage { this._updateStorage(doc, value, labels) ); this.addCancel(docToStorage); - this.docToStorageSubs.set(docId, docToStorage); + this.docToStorageSubs.set(docKey, docToStorage); // This will be called when we get an update from the server, // and merge the changes into the heap. const storageToDoc = this._getStorageProviderForSpace(doc.space).sink< JSONValue - >( - doc.entityId!, - async (storageValue) => await this._updateDoc(doc, storageValue), - ); + >(uri, async (storageValue) => await this._updateDoc(doc, storageValue)); this.addCancel(storageToDoc); - this.storageToDocSubs.set(docId, storageToDoc); + this.storageToDocSubs.set(docKey, storageToDoc); + } + + static toURI(source: EntityId): URI { + if (typeof source["/"] === "string") { + return `of:${source["/"]}`; + } else if (source.toJSON) { + return `of:${source.toJSON()["/"]}`; + } else { + throw Object.assign( + new TypeError( + `💣 Got entity ID that is neither merkle reference nor {'/'}`, + ), + { + cause: source, + }, + ); + } } } diff --git a/packages/runner/src/storage/base.ts b/packages/runner/src/storage/base.ts index a7485ca75..f175504ac 100644 --- a/packages/runner/src/storage/base.ts +++ b/packages/runner/src/storage/base.ts @@ -1,7 +1,6 @@ import { SchemaContext } from "../builder/types.ts"; -import type { Entity, Result, Unit } from "@commontools/memory/interface"; +import type { Result, Unit, URI } from "@commontools/memory/interface"; import type { Cancel } from "../cancel.ts"; -import type { EntityId } from "../doc-map.ts"; import { log } from "../log.ts"; import { IStorageProvider, StorageValue } from "./interface.ts"; export type { Result, Unit }; @@ -12,36 +11,31 @@ export abstract class BaseStorageProvider implements IStorageProvider { protected waitingForSyncResolvers = new Map void>(); abstract send( - batch: { entityId: EntityId; value: StorageValue }[], + batch: { uri: URI; value: StorageValue }[], ): Promise< { ok: object; error?: undefined } | { ok?: undefined; error: Error } >; abstract sync( - entityId: EntityId, + uri: URI, expectedInStorage: boolean, schemaContext?: SchemaContext, ): Promise>; // TODO(@ubik2) //): Promise>, Error>>; - abstract get(entityId: EntityId): StorageValue | undefined; + abstract get(uri: URI): StorageValue | undefined; - sink( - entityId: EntityId, - callback: (value: StorageValue) => void, - ): Cancel { - const key = JSON.stringify(entityId); - - if (!this.subscribers.has(key)) { - this.subscribers.set(key, new Set<(value: StorageValue) => void>()); + sink(uri: URI, callback: (value: StorageValue) => void): Cancel { + if (!this.subscribers.has(uri)) { + this.subscribers.set(uri, new Set<(value: StorageValue) => void>()); } - const listeners = this.subscribers.get(key)!; + const listeners = this.subscribers.get(uri)!; listeners.add(callback); return () => { listeners.delete(callback); - if (listeners.size === 0) this.subscribers.delete(key); + if (listeners.size === 0) this.subscribers.delete(uri); }; } @@ -79,29 +73,4 @@ export abstract class BaseStorageProvider implements IStorageProvider { abstract destroy(): Promise; abstract getReplica(): string | undefined; - - static toEntity(source: EntityId | string): Entity { - if (typeof source === "string") { - if (!source.includes(":")) { - throw new TypeError( - `💣 Got entity ID that is a string, but not a URI: ${source}`, - ); - } - return source as Entity; - } - if (typeof source["/"] === "string") { - return `of:${source["/"]}`; - } else if (source.toJSON) { - return `of:${source.toJSON()["/"]}`; - } else { - throw Object.assign( - new TypeError( - `💣 Got entity ID that is neither merkle reference nor {'/'}`, - ), - { - cause: source, - }, - ); - } - } } diff --git a/packages/runner/src/storage/cache.ts b/packages/runner/src/storage/cache.ts index 374a3c0f5..0b2e0ea5c 100644 --- a/packages/runner/src/storage/cache.ts +++ b/packages/runner/src/storage/cache.ts @@ -58,7 +58,6 @@ import type { StorageValue, URI, } from "./interface.ts"; -import { BaseStorageProvider } from "./base.ts"; import * as IDB from "./idb.ts"; export * from "@commontools/memory/interface"; import { Channel, RawCommand } from "./inspector.ts"; @@ -1316,25 +1315,25 @@ class ProviderConnection implements IStorageProvider { } sink( - entityId: EntityId, + uri: URI, callback: (value: StorageValue) => void, ) { - return this.provider.sink(entityId, callback); + return this.provider.sink(uri, callback); } sync( - entityId: EntityId | URI, + uri: URI, expectedInStorage?: boolean, schemaContext?: SchemaContext, ) { - return this.provider.sync(entityId, expectedInStorage, schemaContext); + return this.provider.sync(uri, expectedInStorage, schemaContext); } - get(entityId: EntityId): StorageValue | undefined { - return this.provider.get(entityId); + get(uri: URI): StorageValue | undefined { + return this.provider.get(uri); } send( - batch: { entityId: EntityId; value: StorageValue }[], + batch: { uri: URI; value: StorageValue }[], ) { return this.provider.send(batch); } @@ -1415,15 +1414,14 @@ export class Provider implements IStorageProvider { } sink( - entityId: EntityId, + uri: URI, callback: (value: StorageValue) => void, ): Cancel { const { the } = this; - const of = BaseStorageProvider.toEntity(entityId); // Capture workspace locally, so that if it changes later, our cancel // will unsubscribe with the same object. const { workspace } = this; - const address = { the, of }; + const address = { the: this.the, of: uri }; const subscriber = (revision?: Revision) => { // If since is -1, this is not a real revision, so don't notify subscribers if (revision && revision.since !== -1) { @@ -1443,13 +1441,12 @@ export class Provider implements IStorageProvider { } sync( - entityId: EntityId | URI, + uri: URI, expectedInStorage?: boolean, schemaContext?: SchemaContext, ) { const { the } = this; - const of = BaseStorageProvider.toEntity(entityId); - const factAddress = { the, of }; + const factAddress = { the, of: uri }; if (schemaContext) { const selector = { path: [], schemaContext: schemaContext }; // We track this server subscription, and don't re-issue it -- @@ -1470,17 +1467,14 @@ export class Provider implements IStorageProvider { } } - get(entityId: EntityId): StorageValue | undefined { - const entity = this.workspace.get({ - the: this.the, - of: BaseStorageProvider.toEntity(entityId), - }); + get(uri: URI): StorageValue | undefined { + const entity = this.workspace.get({ the: this.the, of: uri }); return entity?.is as StorageValue | undefined; } async send( - batch: { entityId: EntityId; value: StorageValue }[], + batch: { uri: URI; value: StorageValue }[], ): Promise< Result< Unit, @@ -1496,8 +1490,8 @@ export class Provider implements IStorageProvider { const TheLabel = "application/label+json" as const; const changes = []; - for (const { entityId, value } of batch) { - const of = BaseStorageProvider.toEntity(entityId); + for (const { uri, value } of batch) { + const of = uri; const content = value.value !== undefined ? JSON.stringify({ value: value.value, source: value.source }) : undefined; diff --git a/packages/runner/src/storage/interface.ts b/packages/runner/src/storage/interface.ts index ac76568dd..31a1242e4 100644 --- a/packages/runner/src/storage/interface.ts +++ b/packages/runner/src/storage/interface.ts @@ -115,24 +115,24 @@ export interface IStorageProvider { /** * Send a value to storage. * - * @param batch - Batch of entity IDs & values to send. + * @param batch - Batch of entity uri & values to send. * @returns Promise that resolves when the value is sent. */ send( - batch: { entityId: EntityId; value: StorageValue }[], + batch: { uri: URI; value: StorageValue }[], ): Promise>; /** * Sync a value from storage. Use `get()` to retrieve the value. * - * @param entityId - Entity ID to sync. + * @param uri - uri of the entity to sync. * @param expectedInStorage - Wait for the value, it's assumed to be in * storage eventually. * @param schemaContext - The schemaContext that determines what to sync. * @returns Promise that resolves when the value is synced. */ sync( - entityId: EntityId | URI, + uri: URI, expectedInStorage?: boolean, schemaContext?: SchemaContext, ): Promise>; @@ -140,22 +140,19 @@ export interface IStorageProvider { /** * Get a value from the local cache reflecting storage. Call `sync()` first. * - * @param entityId - Entity ID to get the value for. + * @param uri - uri of the entity to get the value for. * @returns Value or undefined if the value is not in storage. */ - get(entityId: EntityId): StorageValue | undefined; + get(uri: URI): StorageValue | undefined; /** * Subscribe to storage updates. * - * @param entityId - Entity ID to subscribe to. + * @param uri - uri of the entity to subscribe to. * @param callback - Callback function. * @returns Cancel function to stop the subscription. */ - sink( - entityId: EntityId, - callback: (value: StorageValue) => void, - ): Cancel; + sink(uri: URI, callback: (value: StorageValue) => void): Cancel; /** * Destroy the storage provider. Used for tests only. diff --git a/packages/runner/src/storage/query.ts b/packages/runner/src/storage/query.ts index e7fc069fa..70479ea98 100644 --- a/packages/runner/src/storage/query.ts +++ b/packages/runner/src/storage/query.ts @@ -140,7 +140,7 @@ export class DocObjectManager extends ClientObjectManager { return rv; } // Next, check the storage provider - const storageEntry = this.storageProvider.get(entityId); + const storageEntry = this.storageProvider.get(doc.of); if (storageEntry !== undefined) { const valEntryValue: { value: JSONValue; source?: { "/": string } } = { value: storageEntry.value, diff --git a/packages/runner/test/pending-nursery.test.ts b/packages/runner/test/pending-nursery.test.ts index 7eb86c9ee..eafe7173f 100644 --- a/packages/runner/test/pending-nursery.test.ts +++ b/packages/runner/test/pending-nursery.test.ts @@ -7,7 +7,6 @@ import type { JSONSchema } from "@commontools/runner"; import { Provider } from "../src/storage/cache.ts"; import * as Subscription from "../src/storage/subscription.ts"; import { IRuntime, Runtime } from "../src/runtime.ts"; -import { toURI } from "../src/uri-utils.ts"; import { IStorageManager, IStorageSubscription, @@ -71,14 +70,14 @@ describe("Provider Subscriptions", () => { await runtime.storage.syncCell(cell1); await runtime.storage.synced(); - const uri = toURI(cell1.entityId); + const uri = cell1.getAsNormalizedFullLink().id; const tx = runtime.edit(); cell1.withTx(tx).set(1); await tx.commit(); await runtime.storage.synced(); - expect(provider.get(cell1.entityId)).toEqual({ value: 1 }); + expect(provider.get(uri)).toEqual({ value: 1 }); let s1Count = 0; diff --git a/packages/runner/test/provider-reconnection.test.ts b/packages/runner/test/provider-reconnection.test.ts index e9062464d..f070512f1 100644 --- a/packages/runner/test/provider-reconnection.test.ts +++ b/packages/runner/test/provider-reconnection.test.ts @@ -4,8 +4,7 @@ import { Identity } from "@commontools/identity"; import { Provider } from "../src/storage/cache.ts"; import * as Memory from "@commontools/memory"; import * as Consumer from "@commontools/memory/consumer"; -import type { Entity, SchemaContext } from "@commontools/memory/interface"; -import type { EntityId } from "@commontools/runner"; +import type { SchemaContext, URI } from "@commontools/memory/interface"; import * as Subscription from "../src/storage/subscription.ts"; const signer = await Identity.fromPassphrase("test operator"); @@ -51,12 +50,12 @@ describe("Provider Reconnection", () => { rootSchema: { type: "object", properties: { age: { type: "number" } } }, }; - const entityId1: EntityId = { "/": "user-1" }; - const entityId2: EntityId = { "/": "user-2" }; + const uri1: URI = "of:user-1"; + const uri2: URI = "of:user-2"; // Initial sync to establish subscriptions - await provider.sync(entityId1, true, schema1); - await provider.sync(entityId2, true, schema2); + await provider.sync(uri1, true, schema1); + await provider.sync(uri2, true, schema2); // Override the workspace's pull function to track calls const pullCalls: Array<[any, any?][]> = []; @@ -106,8 +105,8 @@ describe("Provider Reconnection", () => { rootSchema: { type: "object" }, }; - await provider.sync({ "/": "good-entity" }, true, schema); - await provider.sync({ "/": "bad-entity" }, true, schema); + await provider.sync("of:good-entity", true, schema); + await provider.sync("of:bad-entity", true, schema); // Make pull fail const originalPull = provider.workspace.pull.bind(provider.workspace); diff --git a/packages/runner/test/push-conflict.test.ts b/packages/runner/test/push-conflict.test.ts index 0ea70aae5..81d01fbdc 100644 --- a/packages/runner/test/push-conflict.test.ts +++ b/packages/runner/test/push-conflict.test.ts @@ -69,6 +69,7 @@ describe.skip("Push conflict", () => { ); list.set([]); const listDoc = list.getDoc(); + const listURI = list.getAsNormalizedFullLink().id; await list.sync(); const source = session.clone(); @@ -85,15 +86,12 @@ describe.skip("Push conflict", () => { }); // Update memory without notifying main storage - await memory.sync(list.entityId!, true); // Get current value - expect(memory.get(list.entityId!)).toEqual({ value: [] }); + await memory.sync(listURI, true); // Get current value + expect(memory.get(listURI)).toEqual({ value: [] }); - await memory.send([{ - entityId: list.entityId!, - value: { value: [1, 2, 3] }, - }]); + await memory.send([{ uri: listURI, value: { value: [1, 2, 3] } }]); - expect(memory.get(list.entityId!)).toEqual({ value: [1, 2, 3] }); + expect(memory.get(listURI)).toEqual({ value: [1, 2, 3] }); let retryCalled = false; listDoc.retry = [(value) => { @@ -149,17 +147,15 @@ describe.skip("Push conflict", () => { subscription: Subscription.create(), }); + const nameURI = name.getAsNormalizedFullLink().id; + const listURI = list.getAsNormalizedFullLink().id; // Update memory without notifying main storage - await memory.sync(name.entityId!, true); // Get current value - await memory.sync(list.entityId!, true); // Get current value - await memory.send([{ - entityId: name.entityId!, - value: { value: "foo" }, - }, { - entityId: list.entityId!, - value: { value: [1, 2, 3] }, - }]); - + await memory.sync(nameURI, true); // Get current value + await memory.sync(listURI, true); // Get current value + await memory.send([ + { uri: nameURI, value: { value: "foo" } }, + { uri: listURI, value: { value: [1, 2, 3] } }, + ]); let retryCalled = 0; listDoc.retry = [(value) => { retryCalled++; @@ -218,16 +214,15 @@ describe.skip("Push conflict", () => { subscription: Subscription.create(), }); + const nameURI = name.getAsNormalizedFullLink().id; + const listURI = list.getAsNormalizedFullLink().id; // Update memory without notifying main storage - await memory.sync(name.entityId!, true); // Get current value - await memory.sync(list.entityId!, true); // Get current value - await memory.send([{ - entityId: name.entityId!, - value: { value: "foo" }, - }, { - entityId: list.entityId!, - value: { value: [{ n: 1 }, { n: 2 }, { n: 3 }] }, - }]); + await memory.sync(nameURI, true); // Get current value + await memory.sync(listURI, true); // Get current value + await memory.send([ + { uri: nameURI, value: { value: "foo" } }, + { uri: listURI, value: { value: [{ n: 1 }, { n: 2 }, { n: 3 }] } }, + ]); let retryCalled = 0; listDoc.retry = [(value) => { diff --git a/packages/runner/test/storage.test.ts b/packages/runner/test/storage.test.ts index 85745f4fe..8c77d8250 100644 --- a/packages/runner/test/storage.test.ts +++ b/packages/runner/test/storage.test.ts @@ -2,7 +2,7 @@ import { afterEach, beforeEach, describe, it } from "@std/testing/bdd"; import { expect } from "@std/expect"; import { Runtime } from "../src/runtime.ts"; import { type Cell } from "../src/cell.ts"; -import { type IExtendedStorageTransaction } from "../src/storage/interface.ts"; +import type { IExtendedStorageTransaction } from "../src/storage/interface.ts"; import { Identity } from "@commontools/identity"; import { StorageManager } from "@commontools/runner/storage/cache.deno"; @@ -80,7 +80,9 @@ describe("Storage", () => { await testCell.sync(); - const entry = storageManager.open(space).get(refCell.entityId!); + const entry = storageManager.open(space).get( + refCell.getAsNormalizedFullLink().id, + ); expect(entry?.value).toEqual("hello"); }); @@ -101,7 +103,8 @@ describe("Storage", () => { await testCell.sync(); - const entry = storageManager.open(space).get(refCell.entityId!); + const refCellURI = refCell.getAsNormalizedFullLink().id; + const entry = storageManager.open(space).get(refCellURI); expect(entry?.value).toEqual("hello"); }); }); @@ -133,7 +136,8 @@ describe("Storage", () => { it("should wait for a doc to appear", async () => { let synced = false; - storageManager.open(space).sync(testCell.entityId!, true).then( + const testCellURI = testCell.getAsNormalizedFullLink().id; + storageManager.open(space).sync(testCellURI, true).then( () => (synced = true), ); expect(synced).toBe(false); @@ -145,7 +149,8 @@ describe("Storage", () => { it("should wait for a undefined doc to appear", async () => { let synced = false; - storageManager.open(space).sync(testCell.entityId!, true).then( + const testCellURI = testCell.getAsNormalizedFullLink().id; + storageManager.open(space).sync(testCellURI, true).then( () => (synced = true), ); expect(synced).toBe(false); @@ -157,11 +162,12 @@ describe("Storage", () => { it("should wait for a undefined doc to appear with schema and double sync", async () => { let synced = false; const schemaContext = { schema: true, rootSchema: true }; - storageManager.open(space).sync(testCell.entityId!, true, schemaContext) + const testCellURI = testCell.getAsNormalizedFullLink().id; + storageManager.open(space).sync(testCellURI, true, schemaContext) .then( () => (synced = true), ); - storageManager.open(space).sync(testCell.entityId!, true, schemaContext) + storageManager.open(space).sync(testCellURI, true, schemaContext) .then( () => (synced = true), ); @@ -178,19 +184,20 @@ describe("Storage", () => { describe("ephemeral docs", () => { it("should not be loaded from storage", async () => { - const ephemeralDoc = runtime.getCell( + const ephemeralCell = runtime.getCell( space, "ephemeral", undefined, tx, ); - ephemeralDoc.set("transient"); - ephemeralDoc.getDoc().ephemeral = true; - await ephemeralDoc.sync(); + ephemeralCell.set("transient"); + ephemeralCell.getDoc().ephemeral = true; + await ephemeralCell.sync(); const provider = storageManager.open(space); - await provider.sync(ephemeralDoc.entityId!); - const record = provider.get(ephemeralDoc.entityId!); + const ephemeralCellURI = ephemeralCell.getAsNormalizedFullLink().id; + await provider.sync(ephemeralCellURI); + const record = provider.get(ephemeralCellURI); expect(record).toBeUndefined(); }); });