Skip to content

Commit e07b5c6

Browse files
authored
Fix extra sink calls (#991)
* fix extranous sink calls * extra logging in storage for debugging
1 parent 8c15b8f commit e07b5c6

File tree

2 files changed

+21
-9
lines changed

2 files changed

+21
-9
lines changed

runner/src/cell.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import { type EntityId, getDocByEntityId, getEntityId } from "./doc-map.ts";
2020
import { type Cancel, isCancel, useCancelGroup } from "./cancel.ts";
2121
import { validateAndTransform } from "./schema.ts";
2222
import { type Schema } from "@commontools/builder";
23+
import { compactifyPaths } from "./scheduler.ts";
2324

2425
/**
2526
* This is the regular Cell interface, generated by DocImpl.asCell().
@@ -555,14 +556,24 @@ function subscribeToReferencedDocs<T>(
555556

556557
// Subscribe to the docs that are read (via logs), call callback on next change.
557558
const cancel = subscribe((log) => {
559+
const newLog = {
560+
reads: [],
561+
writes: [],
562+
} satisfies ReactivityLog;
563+
558564
if (isCancel(cleanup)) cleanup();
559565
const newValue = validateAndTransform(
560566
doc,
561567
path,
562568
schema,
563-
log,
569+
newLog,
564570
rootSchema,
565571
) as T;
572+
// Copy reads to log _before_ calling the callback, as we're only interested
573+
// in dependencies for the initial get, not further cells the callback might
574+
// read. The callback is responsible for calling sink on those cells if it
575+
// wants to stay updated.
576+
log.reads.push(...newLog.reads);
566577
cleanup = callback(newValue);
567578
}, initialLog);
568579

runner/src/storage.ts

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -441,20 +441,20 @@ class StorageImpl implements Storage {
441441
source: doc.sourceCell?.entityId,
442442
};
443443

444-
// 🤔 I'm guessing we should be storting schema here
444+
// 🤔 I'm guessing we should be storing schema here
445445

446446
if (JSON.stringify(value) !== JSON.stringify(this.writeValues.get(doc))) {
447-
this.writeDependentDocs.set(doc, dependencies);
448-
this.writeValues.set(doc, value);
449-
450-
this._addToBatch([{ doc, type: "storage" }]);
451-
452447
log(() => [
453448
"prep for storage",
454449
JSON.stringify(doc.entityId),
455450
value,
451+
this.writeValues.get(doc),
456452
[...dependencies].map((c) => JSON.stringify(c.entityId)),
457453
]);
454+
this.writeDependentDocs.set(doc, dependencies);
455+
this.writeValues.set(doc, value);
456+
457+
this._addToBatch([{ doc, type: "storage" }]);
458458
}
459459
}
460460

@@ -788,12 +788,13 @@ class StorageImpl implements Storage {
788788
return storage.send(jobs).then((result) => retryOnConflict(result))
789789
.then((result) => {
790790
if (result.ok) {
791+
log(() => ["storage ok", JSON.stringify(result.ok, null, 2)]);
791792
// Apply updates from retry, if transaction ultimately succeeded
792793
updatesFromRetry.forEach(([doc, value]) =>
793794
this._batchForDoc(doc, value.value, value.source)
794795
);
795796
} else if (result.error) {
796-
log(() => ["storage error", result.error]);
797+
log(() => ["storage error", JSON.stringify(result.error, null, 2)]);
797798
console.error("storage error", result.error);
798799
}
799800
return result;
@@ -803,7 +804,7 @@ class StorageImpl implements Storage {
803804
// Write all storage jobs to storage, in parallel
804805
const promiseJobs = [];
805806
for (const [space, jobs] of storageJobsBySpace.entries()) {
806-
promiseJobs.push(process(space, jobs));
807+
if (jobs.length) promiseJobs.push(process(space, jobs));
807808
}
808809
await Promise.all(promiseJobs);
809810
log(() => ["storage jobs done"]);

0 commit comments

Comments
 (0)