Skip to content

Commit fc446f6

Browse files
authored
OpenTelemetry for toolshed + common-memory (#545)
* Add OpenTelemetry instrumentation to common-memory * Add OpenTelemetry tracing to Toolshed
1 parent 64c4039 commit fc446f6

File tree

14 files changed

+933
-240
lines changed

14 files changed

+933
-240
lines changed

typescript/packages/common-memory/deno.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
},
3131
"imports": {
3232
"@db/sqlite": "jsr:@db/sqlite@^0.12.0",
33+
"@opentelemetry/api": "npm:@opentelemetry/api@^1.9.0",
3334
"@std/assert": "jsr:@std/assert@1",
3435
"@std/fs": "jsr:@std/fs@^1.0.10",
3536
"merkle-reference": "npm:merkle-reference@^2.0.1",

typescript/packages/common-memory/lib.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,4 @@ export * as ChangesBuilder from "./changes.ts";
1212
export * as TransactionBuilder from "./transaction.ts";
1313
export * as CommitBuilder from "./commit.ts";
1414
export * as Principal from "./principal.ts";
15+
export * as Telemetry from "./telemetry.ts";
Lines changed: 129 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
import * as Space from "./space.ts";
22
import * as Error from "./error.ts";
33
import * as FS from "@std/fs";
4+
import {
5+
addChangesAttributes,
6+
addMemoryAttributes,
7+
recordResult,
8+
traceAsync,
9+
traceSync,
10+
} from "./telemetry.ts";
411
import {
512
AsyncResult,
613
ConnectionError,
@@ -45,15 +52,18 @@ export class Memory implements Session, MemorySession {
4552
* which seems to cause problems if query and transaction happen concurrently.
4653
*/
4754
async perform<Out>(task: () => Promise<Out>): Promise<Out> {
48-
const result = this.ready.finally().then(task);
49-
this.ready = result.finally();
50-
await this.ready;
51-
return await result;
55+
return await traceAsync("memory.perform", async (span) => {
56+
const result = this.ready.finally().then(task);
57+
this.ready = result.finally();
58+
await this.ready;
59+
return await result;
60+
});
5261
}
5362

5463
subscribe(subscriber: Subscriber): SubscribeResult {
5564
return subscribe(this, subscriber);
5665
}
66+
5767
unsubscribe(subscriber: Subscriber): SubscribeResult {
5868
return unsubscribe(this, subscriber);
5969
}
@@ -72,65 +82,116 @@ export class Memory implements Session, MemorySession {
7282
}
7383

7484
export const subscribe = (session: Session, subscriber: Subscriber) => {
75-
session.subscribers.add(subscriber);
76-
return { ok: {} };
85+
return traceSync("memory.subscribe", (span) => {
86+
addMemoryAttributes(span, { operation: "subscribe" });
87+
session.subscribers.add(subscriber);
88+
span.setAttribute("memory.subscriber_count", session.subscribers.size);
89+
return { ok: {} };
90+
});
7791
};
7892

7993
export const unsubscribe = (session: Session, subscriber: Subscriber) => {
80-
session.subscribers.delete(subscriber);
81-
return { ok: {} };
94+
return traceSync("memory.unsubscribe", (span) => {
95+
addMemoryAttributes(span, { operation: "unsubscribe" });
96+
session.subscribers.delete(subscriber);
97+
span.setAttribute("memory.subscriber_count", session.subscribers.size);
98+
return { ok: {} };
99+
});
82100
};
83101

84102
export const query = async (session: Session, query: Query) => {
85-
const { ok: space, error } = await mount(session, query.sub);
86-
if (error) {
87-
return { error };
88-
}
103+
return await traceAsync("memory.query", async (span) => {
104+
addMemoryAttributes(span, {
105+
operation: "query",
106+
space: query.sub,
107+
});
108+
109+
const { ok: space, error } = await mount(session, query.sub);
110+
if (error) {
111+
span.setAttribute("mount.status", "error");
112+
return { error };
113+
}
89114

90-
return space.query(query);
115+
span.setAttribute("mount.status", "success");
116+
return space.query(query);
117+
});
91118
};
92119

93120
export const transact = async (session: Session, transaction: Transaction) => {
94-
const { ok: space, error } = await mount(session, transaction.sub);
95-
if (error) {
96-
return { error };
97-
}
121+
return await traceAsync("memory.transact", async (span) => {
122+
addMemoryAttributes(span, {
123+
operation: "transact",
124+
space: transaction.sub,
125+
});
98126

99-
const result = space.transact(transaction);
127+
if (transaction.args?.changes) {
128+
addChangesAttributes(span, transaction.args.changes);
129+
}
100130

101-
if (result.error) {
102-
return result;
103-
} else {
104-
// Notify all the relevant subscribers.
105-
const promises = [];
106-
for (const subscriber of session.subscribers) {
107-
promises.push(subscriber.transact(transaction));
131+
const { ok: space, error } = await mount(session, transaction.sub);
132+
if (error) {
133+
span.setAttribute("mount.status", "error");
134+
return { error };
108135
}
109-
await Promise.all(promises);
110-
}
111136

112-
return result;
137+
span.setAttribute("mount.status", "success");
138+
const result = space.transact(transaction);
139+
140+
if (result.error) {
141+
return result;
142+
} else {
143+
return await traceAsync(
144+
"memory.notify_subscribers",
145+
async (notifySpan) => {
146+
notifySpan.setAttribute(
147+
"memory.subscriber_count",
148+
session.subscribers.size,
149+
);
150+
151+
const promises = [];
152+
for (const subscriber of session.subscribers) {
153+
promises.push(subscriber.transact(transaction));
154+
}
155+
await Promise.all(promises);
156+
157+
return result;
158+
},
159+
);
160+
}
161+
});
113162
};
114163

115164
export const mount = async (
116165
session: Session,
117166
subject: Subject,
118167
): Promise<Result<SpaceSession, ConnectionError>> => {
119-
const space = session.spaces.get(subject);
120-
if (space) {
121-
return { ok: space };
122-
} else {
123-
const result = await Space.open({
124-
url: new URL(`./${subject}.sqlite`, session.store),
168+
return await traceAsync("memory.mount", async (span) => {
169+
addMemoryAttributes(span, {
170+
operation: "mount",
171+
space: subject,
125172
});
126173

127-
if (result.error) {
128-
return result;
174+
const space = session.spaces.get(subject);
175+
if (space) {
176+
span.setAttribute("memory.mount.cache", "hit");
177+
return { ok: space };
178+
} else {
179+
span.setAttribute("memory.mount.cache", "miss");
180+
181+
const result = await Space.open({
182+
url: new URL(`./${subject}.sqlite`, session.store),
183+
});
184+
185+
if (result.error) {
186+
return result;
187+
}
188+
189+
const replica = result.ok as SpaceSession;
190+
session.spaces.set(subject, replica);
191+
span.setAttribute("memory.spaces_count", session.spaces.size);
192+
return { ok: replica };
129193
}
130-
const replica = result.ok as SpaceSession;
131-
session.spaces.set(subject, replica);
132-
return { ok: replica };
133-
}
194+
});
134195
};
135196

136197
export interface Options {
@@ -140,27 +201,38 @@ export interface Options {
140201
export const open = async (
141202
options: Options,
142203
): AsyncResult<Memory, ConnectionError> => {
143-
try {
144-
if (options.store.protocol === "file:") {
145-
await FS.ensureDir(options.store);
204+
return await traceAsync("memory.open", async (span) => {
205+
addMemoryAttributes(span, { operation: "open" });
206+
span.setAttribute("memory.store_url", options.store.toString());
207+
208+
try {
209+
if (options.store.protocol === "file:") {
210+
await FS.ensureDir(options.store);
211+
}
212+
return { ok: await new Memory(options) };
213+
} catch (cause) {
214+
return { error: Error.connection(options.store, cause as SystemError) };
146215
}
147-
return { ok: await new Memory(options) };
148-
} catch (cause) {
149-
return { error: Error.connection(options.store, cause as SystemError) };
150-
}
216+
});
151217
};
152218

153219
export const close = async (session: Session) => {
154-
const promises = [];
155-
for (const replica of session.spaces.values()) {
156-
promises.push(replica.close());
157-
}
220+
return await traceAsync("memory.close", async (span) => {
221+
addMemoryAttributes(span, { operation: "close" });
222+
span.setAttribute("memory.spaces_count", session.spaces.size);
223+
span.setAttribute("memory.subscriber_count", session.subscribers.size);
158224

159-
for (const subscriber of session.subscribers) {
160-
promises.push(subscriber.close());
161-
}
225+
const promises = [];
226+
for (const replica of session.spaces.values()) {
227+
promises.push(replica.close());
228+
}
229+
230+
for (const subscriber of session.subscribers) {
231+
promises.push(subscriber.close());
232+
}
162233

163-
const results = await Promise.all(promises);
164-
const result = results.find((result) => result?.error);
165-
return result ?? { ok: {} };
234+
const results = await Promise.all(promises);
235+
const result = results.find((result) => result?.error);
236+
return result ?? { ok: {} };
237+
});
166238
};

0 commit comments

Comments
 (0)