Skip to content

Commit f7d1d5d

Browse files
authored
Don't check subscriptions with read only transactions (#1823)
* Don't check subscriptions with read only transactions Some cleanup to avoid some of our complex nested classes. * Add a flag to Replica.push so that by default, we don't push transactions that are only Claims. * Alter transaction to prevent commit of nothing but claims. Add comment to Replica.push to note that we generally bypass that now (still used in tests and curl). * Remove Replica.push and change Provider.send to directly commit instead. (#1827)
1 parent 2de9d9c commit f7d1d5d

File tree

5 files changed

+109
-161
lines changed

5 files changed

+109
-161
lines changed

packages/memory/provider.ts

Lines changed: 47 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import {
3535
import * as SelectionBuilder from "./selection.ts";
3636
import * as Memory from "./memory.ts";
3737
import { refer } from "./reference.ts";
38-
import { redactCommit } from "./space.ts";
38+
import { redactCommitData } from "./space.ts";
3939
import * as Subscription from "./subscription.ts";
4040
import * as FactModule from "./fact.ts";
4141
import { setRevision } from "@commontools/memory/selection";
@@ -366,43 +366,50 @@ class MemoryProviderSession<
366366
}
367367

368368
async commit(commit: Commit<Space>) {
369-
// First, check to see if any of our schema queries need to be notified
370-
// Any queries that lack access are skipped (with a console log)
371-
const [lastId, maxSince, facts] = await this.getSchemaSubscriptionMatches(
372-
commit,
373-
);
374-
375-
// We need to remove any classified results from our commit.
376-
// The schema subscription has a classification claim, but these don't.
377-
const redactedCommit = redactCommit(commit);
378-
const [[space, _ignored]] = Object.entries(commit);
369+
// We should really only have one item, but it's technically legal to have
370+
// multiple transactions in the same commit, so iterate
371+
for (
372+
const item of SelectionBuilder.iterate<{ is: Memory.CommitData }>(commit)
373+
) {
374+
// We need to remove any classified results from our commit.
375+
// The schema subscription has a classification claim, but these don't.
376+
const redactedData = redactCommitData(item.value.is);
377+
if (Subscription.isTransactionReadOnly(redactedData.transaction)) {
378+
continue;
379+
}
380+
// First, check to see if any of our schema queries need to be notified
381+
// Any queries that lack access are skipped (with a console log)
382+
const [lastId, maxSince, facts] = await this.getSchemaSubscriptionMatches(
383+
redactedData.transaction,
384+
);
379385

380-
const jobIds: InvocationURL<Reference<Subscribe>>[] = [];
381-
for (const [id, channels] of this.channels) {
382-
if (Subscription.match(redactedCommit, channels)) {
383-
jobIds.push(id);
386+
const jobIds: InvocationURL<Reference<Subscribe>>[] = [];
387+
for (const [id, channels] of this.channels) {
388+
if (Subscription.match(redactedData.transaction, channels)) {
389+
jobIds.push(id);
390+
}
384391
}
385-
}
386392

387-
if (jobIds.length > 0) {
388-
// The client has a subscription to the space's commit log, but our
389-
// subscriptions may trigger inclusion of other objects. Add these here.
390-
const factsList = [...FactModule.iterate(facts[space as Space])];
391-
const enhancedCommit: EnhancedCommit<Space> = {
392-
commit: redactedCommit,
393-
revisions: this.filterKnownFacts(factsList),
394-
};
395-
396-
for (const id of jobIds) {
397-
// this is sent to a standard subscription (application/commit+json)
398-
this.perform({
399-
the: "task/effect",
400-
of: id,
401-
is: enhancedCommit,
402-
});
393+
if (jobIds.length > 0) {
394+
// The client has a subscription to the space's commit log, but our
395+
// subscriptions may trigger inclusion of other objects. Add these here.
396+
const enhancedCommit: EnhancedCommit<Space> = {
397+
commit: {
398+
[item.of]: { [item.the]: { [item.cause]: { is: redactedData } } },
399+
} as Commit<Space>,
400+
revisions: this.filterKnownFacts(facts),
401+
};
402+
403+
for (const id of jobIds) {
404+
// this is sent to a standard subscription (application/commit+json)
405+
this.perform({
406+
the: "task/effect",
407+
of: id,
408+
is: enhancedCommit,
409+
});
410+
}
403411
}
404412
}
405-
406413
return { ok: {} };
407414
}
408415

@@ -470,17 +477,18 @@ class MemoryProviderSession<
470477
}
471478

472479
private async getSchemaSubscriptionMatches<Space extends MemorySpace>(
473-
commit: Commit<Space>,
474-
): Promise<[JobId | undefined, number, Selection<Space>]> {
480+
transaction: Transaction<Space>,
481+
): Promise<[JobId | undefined, number, Revision<Fact>[]]> {
475482
const schemaMatches = new Map<string, Revision<Fact>>();
483+
const space = transaction.sub;
476484
let maxSince = -1;
477485
let lastId;
478-
const [[spaceStr, _attributes]] = Object.entries(commit);
479-
const space = spaceStr as Space;
480486
// Eventually, we should support multiple spaces, but currently the since handling is per-space
481487
// Our websockets are also per-space, so there's larger issues involved.
482488
for (const [id, subscription] of this.schemaChannels) {
483-
if (Subscription.match(commit, subscription.watchedObjects)) {
489+
if (
490+
Subscription.match(transaction, subscription.watchedObjects)
491+
) {
484492
// Re-run our original query, but not as a subscription
485493
const newArgs = { ...subscription.invocation.args, subscribe: false };
486494
const newInvocation = { ...subscription.invocation, args: newArgs };
@@ -519,11 +527,7 @@ class MemoryProviderSession<
519527
maxSince = since > maxSince ? since : maxSince;
520528
}
521529
}
522-
const selection = SelectionBuilder.from(
523-
schemaMatches.values().map((item) => [item, item.since]),
524-
);
525-
const selectionSpace = { [space]: selection } as Selection<Space>;
526-
return [lastId, maxSince, selectionSpace];
530+
return [lastId, maxSince, [...schemaMatches.values()]];
527531
}
528532
}
529533

packages/memory/space.ts

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -992,20 +992,9 @@ export function getClassifications(
992992
return classifications;
993993
}
994994

995-
export function redactCommit(commit: Commit): Commit {
996-
const newCommit = {};
997-
for (const item of iterate(commit)) {
998-
const redactedData = redactCommitData(item.value.is);
999-
set(newCommit, item.of, item.the, item.cause, { is: redactedData });
1000-
}
1001-
return newCommit;
1002-
}
1003-
1004995
// Return the item with any classified results and the labels removed.
1005-
export function redactCommitData(
1006-
commitData?: CommitData,
1007-
): CommitData | undefined {
1008-
if (commitData === undefined || commitData.labels === undefined) {
996+
export function redactCommitData(commitData: CommitData): CommitData {
997+
if (commitData.labels === undefined) {
1009998
return commitData;
1010999
}
10111000
// Make a copy of the transaction with no changes

packages/memory/subscription.ts

Lines changed: 36 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,37 @@
11
import {
22
Cause,
3-
Commit,
43
Entity,
54
MemorySpace,
65
SchemaSelector,
76
Selector,
87
The,
8+
Transaction,
99
} from "./interface.ts";
10+
import { iterate } from "./selection.ts";
1011
import { COMMIT_LOG_TYPE } from "./commit.ts";
1112

12-
export const match = (commit: Commit, watched: Set<string>) => {
13-
for (const at of Object.keys(commit) as MemorySpace[]) {
14-
const commitObj = commit[at][COMMIT_LOG_TYPE] ?? {};
15-
for (const { is: { transaction } } of Object.values(commitObj)) {
16-
// If commit on this space are watched we have a match
17-
if (matchAddress(watched, { the: COMMIT_LOG_TYPE, of: at, at })) {
18-
return true;
19-
}
13+
export const match = (
14+
transaction: Transaction<MemorySpace>,
15+
watched: Set<string>,
16+
) => {
17+
const space = transaction.sub;
18+
// If commit on this space are watched we have a match
19+
if (
20+
matchAddress(watched, { at: space, of: space, the: COMMIT_LOG_TYPE })
21+
) {
22+
return true;
23+
}
2024

21-
// Otherwise we consider individual in the commit transaction to figure
22-
// out if we have a match.
23-
for (const [of, attributes] of Object.entries(transaction.args.changes)) {
24-
for (const [the, changes] of Object.entries(attributes)) {
25-
for (const change of Object.values(changes)) {
26-
// If `change == true` we simply confirm that state has not changed
27-
// so we don't need to notify those subscribers.
28-
if (
29-
change !== true &&
30-
matchAddress(watched, { at: transaction.sub, the, of })
31-
) {
32-
return true;
33-
}
34-
}
35-
}
36-
}
25+
// Otherwise we consider individual in the commit transaction to figure
26+
// out if we have a match.
27+
for (const fact of iterate(transaction.args.changes)) {
28+
// If `fact.value == true` we simply confirm that state has not changed
29+
// so we don't need to notify those subscribers.
30+
if (
31+
fact.value !== true &&
32+
matchAddress(watched, { at: space, of: fact.of, the: fact.the })
33+
) {
34+
return true;
3735
}
3836
}
3937

@@ -42,9 +40,9 @@ export const match = (commit: Commit, watched: Set<string>) => {
4240

4341
const matchAddress = (
4442
watched: Set<string>,
45-
{ at, the, of }: { the: string; of: string; at: MemorySpace },
43+
{ at, of, the }: { at: MemorySpace; of: string; the: string },
4644
) =>
47-
watched.has(formatAddress({ at, the, of })) ||
45+
watched.has(formatAddress({ at, of, the })) ||
4846
watched.has(formatAddress({ at, the })) ||
4947
watched.has(formatAddress({ at, of })) ||
5048
watched.has(formatAddress({ at }));
@@ -98,6 +96,17 @@ export const fromSelector = function* (selector: Selector | SchemaSelector) {
9896
}
9997
};
10098

99+
export function isTransactionReadOnly(
100+
transaction: Transaction<MemorySpace>,
101+
): boolean {
102+
for (const fact of iterate(transaction.args.changes)) {
103+
if (fact.value !== true) {
104+
return false;
105+
}
106+
}
107+
return true;
108+
}
109+
101110
export const formatAddress = (
102111
{ at = "_", of = "_", the = "_" }: {
103112
at?: string;

packages/runner/src/storage/cache.ts

Lines changed: 23 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -971,73 +971,6 @@ export class Replica {
971971
this.syncTimer = setTimeout(this.pull, this.syncTimeout);
972972
}
973973

974-
/**
975-
* Attempts to commit and push changes to the remote. It optimistically
976-
* updates local state so that subsequent commits can be made without having
977-
* awaiting for each commit to succeed. However if commit fails all the local
978-
* changes made will be reverted back to the last merged state.
979-
*
980-
* ⚠️ Please note that if commits stack up e.g by incrementing value of the
981-
* same entity, rejected commit will ripple through stack as the changes there
982-
* would assume state that is rejected.
983-
*/
984-
async push(
985-
changes: (Assert | Retract | Claim)[],
986-
): Promise<
987-
Result<
988-
Commit,
989-
PushError
990-
>
991-
> {
992-
// Generate the selectors for the various change objects
993-
const loadArgs: [BaseMemoryAddress, SchemaPathSelector?][] = changes.map((
994-
change,
995-
) => {
996-
const schema = generateSchemaFromLabels(change);
997-
const selector = schema === undefined
998-
? undefined
999-
: { path: [], schemaContext: { schema: schema, rootSchema: schema } };
1000-
return [{ id: change.of, type: change.the }, selector];
1001-
});
1002-
// First we pull all the affected entries into heap so we can build a
1003-
// transaction that is aware of latest state.
1004-
const { error } = await this.load(loadArgs);
1005-
if (error) {
1006-
return { error };
1007-
} else {
1008-
// Collect facts so that we can derive desired state and a corresponding
1009-
// transaction
1010-
const facts: Fact[] = [];
1011-
const claims: Invariant[] = [];
1012-
for (const { the, of, is, claim } of changes) {
1013-
const fact = this.get({ id: of, type: the });
1014-
1015-
if (claim) {
1016-
claims.push(claimState(fact!));
1017-
} else if (is === undefined) {
1018-
// If `is` is `undefined` we want to retract the fact.
1019-
// If local `is` in the local state is also `undefined` desired state
1020-
// matches current state in which case we omit this change from the
1021-
// transaction, otherwise we retract fact.
1022-
if (fact?.is !== undefined) {
1023-
facts.push(retract(fact));
1024-
}
1025-
} else {
1026-
facts.push(assert({
1027-
the,
1028-
of,
1029-
is,
1030-
// If fact has no `cause` it is unclaimed fact.
1031-
cause: fact?.cause ? fact : null,
1032-
}));
1033-
}
1034-
}
1035-
1036-
// These push transaction that will commit desired state to a remote.
1037-
return this.commit({ facts, claims });
1038-
}
1039-
}
1040-
1041974
async commit(transaction: ITransaction, source?: IStorageTransaction) {
1042975
const { facts, claims } = transaction;
1043976
const changes = Differential.create().update(this, facts);
@@ -1616,6 +1549,7 @@ class ProviderConnection implements IStorageProvider {
16161549
get<T = any>(uri: URI): StorageValue<T> | undefined {
16171550
return this.provider.get(uri);
16181551
}
1552+
16191553
send<T = any>(
16201554
batch: { uri: URI; value: StorageValue<T> }[],
16211555
) {
@@ -1760,6 +1694,8 @@ export class Provider implements IStorageProvider {
17601694
return entity?.is as StorageValue<T> | undefined;
17611695
}
17621696

1697+
// This is mostly just used by tests and tools, since the transactions will
1698+
// directly commit their results.
17631699
async send<T = any>(
17641700
batch: { uri: URI; value: StorageValue<T> }[],
17651701
): Promise<
@@ -1768,7 +1704,9 @@ export class Provider implements IStorageProvider {
17681704
const { the, workspace } = this;
17691705
const LABEL_TYPE = "application/label+json" as const;
17701706

1771-
const changes = [];
1707+
// Collect facts so that we can derive desired state and a corresponding
1708+
// transaction
1709+
const facts: Fact[] = [];
17721710
for (const { uri, value } of batch) {
17731711
const content = value.value !== undefined
17741712
? JSON.stringify({ value: value.value, source: value.source })
@@ -1779,29 +1717,37 @@ export class Provider implements IStorageProvider {
17791717
if (content !== undefined) {
17801718
// ⚠️ We do JSON roundtrips to strip off the undefined values that
17811719
// cause problems with serialization.
1782-
changes.push({ of: uri, the, is: JSON.parse(content) as JSONValue });
1720+
facts.push(assert({
1721+
the,
1722+
of: uri,
1723+
is: JSON.parse(content) as JSONValue,
1724+
// If fact has no `cause` it is unclaimed fact.
1725+
cause: current?.cause ? current : null,
1726+
}));
17831727
} else {
1784-
changes.push({ of: uri, the });
1728+
facts.push(retract(current as Consumer.Assertion));
17851729
}
17861730
}
17871731
if (value.labels !== undefined) {
17881732
const currentLabel = workspace.get({ id: uri, type: LABEL_TYPE });
17891733
if (!deepEqual(currentLabel?.is, value.labels)) {
17901734
if (value.labels !== undefined) {
1791-
changes.push({
1792-
of: uri,
1735+
facts.push(assert({
17931736
the: LABEL_TYPE,
1737+
of: uri,
17941738
is: value.labels as JSONValue,
1795-
});
1739+
// If fact has no `cause` it is unclaimed fact.
1740+
cause: currentLabel?.cause ? currentLabel : null,
1741+
}));
17961742
} else {
1797-
changes.push({ of: uri, the: LABEL_TYPE });
1743+
facts.push(retract(currentLabel as Consumer.Assertion));
17981744
}
17991745
}
18001746
}
18011747
}
1802-
1803-
if (changes.length > 0) {
1804-
const result = await this.workspace.push(changes);
1748+
// If we don't have any writes, don't bother sending it.
1749+
if (facts.length > 0) {
1750+
const result = await this.workspace.commit({ facts, claims: [] });
18051751
return result.error ? result : { ok: {} };
18061752
} else {
18071753
return { ok: {} };

0 commit comments

Comments
 (0)