diff --git a/packages/memory/space-schema.ts b/packages/memory/space-schema.ts index 76337ea3b..236bfdf54 100644 --- a/packages/memory/space-schema.ts +++ b/packages/memory/space-schema.ts @@ -348,18 +348,16 @@ function addToSelection( } // Get the ValueEntry objects for the facts that match our selector -function getMatchingFacts( +function* getMatchingFacts( session: SpaceStoreSession, factSelector: FactSelector, ): Iterable { - const results = []; for (const fact of selectFacts(session, factSelector)) { - results.push({ + yield { value: fact.is, address: { id: fact.of, type: fact.the, path: [] }, cause: fact.cause, since: fact.since, - }); + }; } - return results; } diff --git a/packages/memory/space.ts b/packages/memory/space.ts index 3c683f38e..9223b0620 100644 --- a/packages/memory/space.ts +++ b/packages/memory/space.ts @@ -570,15 +570,19 @@ export const selectFacts = function* ( { store }: Session, { the, of, cause, is, since }: FactSelector, ): Iterable { - const rows = store.prepare(EXPORT).all({ - the: the === SelectAllString ? null : the, - of: of === SelectAllString ? null : of, - cause: cause === SelectAllString ? null : cause, - is: is === undefined ? null : {}, - since: since ?? null, - }) as StateRow[]; - - for (const row of rows) { + const stmt = store.prepare(EXPORT); + // Note: Cannot finalize() in a generator function's finally block because + // the finally block runs when the generator returns (immediately), not when + // it's exhausted. The statement will be finalized when the store is closed. + for ( + const row of stmt.iter({ + the: the === SelectAllString ? null : the, + of: of === SelectAllString ? null : of, + cause: cause === SelectAllString ? null : cause, + is: is === undefined ? null : {}, + since: since ?? null, + }) as Iterable + ) { yield toFact(row); } }; @@ -587,14 +591,23 @@ export const selectFact = function ( { store }: Session, { the, of, since }: { the: MIME; of: URI; since?: number }, ): SelectedFact | undefined { - const rows = store.prepare(EXPORT).all({ - the: the, - of: of, - cause: null, - is: null, - since: since ?? null, - }) as StateRow[]; - return (rows.length > 0) ? toFact(rows[0]) : undefined; + const stmt = store.prepare(EXPORT); + try { + for ( + const row of stmt.iter({ + the: the, + of: of, + cause: null, + is: null, + since: since ?? null, + }) as Iterable + ) { + return toFact(row); + } + return undefined; + } finally { + stmt.finalize(); + } }; /** diff --git a/packages/runner/integration/array_push.test.ts b/packages/runner/integration/array_push.test.ts index b6a2c8c76..17f734a62 100644 --- a/packages/runner/integration/array_push.test.ts +++ b/packages/runner/integration/array_push.test.ts @@ -21,19 +21,12 @@ const MEMORY_WS_URL = `${ const SPACE_NAME = "runner_integration"; const TOTAL_COUNT = 20; // how many elements we push to the array -const TIMEOUT_MS = 30000; // timeout for the test in ms +const TIMEOUT_MS = 180000; // timeout for the test in ms (3 minutes) console.log("Array Push Test"); console.log(`Connecting to: ${MEMORY_WS_URL}`); console.log(`API URL: ${API_URL}`); -// Set up timeout -const timeoutPromise = new Promise((_, reject) => { - setTimeout(() => { - reject(new Error(`Test timed out after ${TIMEOUT_MS}ms`)); - }, TIMEOUT_MS); -}); - // Main test function async function runTest() { const account = await Identity.fromPassphrase(ANYONE); @@ -192,12 +185,23 @@ async function runTest() { } // Run the test with timeout -try { - await Promise.race([runTest(), timeoutPromise]); - console.log("Test completed successfully within timeout"); - Deno.exit(0); -} catch (error) { - const errorMessage = error instanceof Error ? error.message : String(error); - console.error("Test failed:", errorMessage, (error as Error).stack); - Deno.exit(1); -} +Deno.test({ + name: "array push test", + fn: async () => { + let timeoutHandle: number; + const timeoutPromise = new Promise((_, reject) => { + timeoutHandle = setTimeout(() => { + reject(new Error(`Test timed out after ${TIMEOUT_MS}ms`)); + }, TIMEOUT_MS); + }); + + try { + await Promise.race([runTest(), timeoutPromise]); + console.log("Test completed successfully within timeout"); + } finally { + clearTimeout(timeoutHandle!); + } + }, + sanitizeResources: false, + sanitizeOps: false, +}); diff --git a/packages/runner/integration/basic-persistence.test.ts b/packages/runner/integration/basic-persistence.test.ts index b3faef078..ecbb4d8be 100644 --- a/packages/runner/integration/basic-persistence.test.ts +++ b/packages/runner/integration/basic-persistence.test.ts @@ -15,6 +15,8 @@ const identity = await Identity.fromPassphrase("test operator", keyConfig); console.log("\n=== TEST: Simple object persistence ==="); +const TIMEOUT_MS = 180000; // 3 minutes timeout + async function test() { // First runtime - save data const runtime1 = new Runtime({ @@ -67,16 +69,37 @@ async function test() { return [cell1Contents, cell2Contents]; } -for (let i: number = 1; i <= 20; i++) { - const [result1, result2] = await test(); - if (!deepEqual(result1, result2)) { - console.error("Mismatched results for iteration", i, result1, result2); - Deno.exit(1); - } - if (i % 5 == 0) { - console.log("completed", i, "..."); +async function runTest() { + for (let i: number = 1; i <= 20; i++) { + const [result1, result2] = await test(); + if (!deepEqual(result1, result2)) { + console.error("Mismatched results for iteration", i, result1, result2); + throw new Error(`Mismatched results for iteration ${i}`); + } + if (i % 5 == 0) { + console.log("completed", i, "..."); + } } + + console.log("\nDone"); } -console.log("\nDone"); -Deno.exit(0); +Deno.test({ + name: "basic persistence test", + fn: async () => { + let timeoutHandle: number; + const timeoutPromise = new Promise((_, reject) => { + timeoutHandle = setTimeout(() => { + reject(new Error(`Test timed out after ${TIMEOUT_MS}ms`)); + }, TIMEOUT_MS); + }); + + try { + await Promise.race([runTest(), timeoutPromise]); + } finally { + clearTimeout(timeoutHandle!); + } + }, + sanitizeResources: false, + sanitizeOps: false, +}); diff --git a/packages/runner/integration/derive_array_leak.test.ts b/packages/runner/integration/derive_array_leak.test.ts new file mode 100644 index 000000000..a2b4cd66d --- /dev/null +++ b/packages/runner/integration/derive_array_leak.test.ts @@ -0,0 +1,231 @@ +#!/usr/bin/env -S deno run --allow-net --allow-env --allow-read --allow-run + +/** + * Integration test to reproduce memory leak when using derive() with array.map() + * + * Bug: When a cell updates repeatedly and a derived array is created for each update, + * the old derived arrays are never garbage collected, causing unbounded memory growth. + * + * Expected behavior: Memory should stabilize or grow modestly + * Actual behavior: Memory grows by gigabytes (1GB+ per 100 increments) + */ +import { ANYONE, Identity, Session } from "@commontools/identity"; +import { env } from "@commontools/integration"; +import { StorageManager } from "../src/storage/cache.ts"; +import { Runtime } from "../src/index.ts"; +import { CharmManager, compileRecipe } from "@commontools/charm"; + +(Error as any).stackTraceLimit = 100; + +const { API_URL } = env; +const SPACE_NAME = "runner_integration"; +const TIMEOUT_MS = 300000; + +// Test parameters +const INCREMENTS_PER_CLICK = 50; // How many times each click increments (must match .tsx file) +const MAX_MEMORY_INCREASE_RATIO = 2.0; // Fail ratio + +console.log("Derive Array Leak Test"); +console.log(`Connecting to: ${API_URL}`); +console.log(`Will increment ${INCREMENTS_PER_CLICK} times in one click`); + +// Helper to get server process memory (portable across Linux and macOS) +async function getServerMemoryMB(): Promise { + // Try to find toolshed process - it could be either: + // 1. Compiled binary: "toolshed" (in CI) + // 2. Deno script: "deno run --unstable-otel" (local dev) + let pid: string | undefined; + + // First try compiled binary (CI) + const pgrepBinary = new Deno.Command("pgrep", { + args: ["-f", "toolshed"], + stdout: "piped", + }); + const { stdout: binaryOut, code: binaryCode } = await pgrepBinary.output(); + + if (binaryCode === 0 && binaryOut && binaryOut.length > 0) { + const pids = new TextDecoder().decode(binaryOut).trim().split("\n"); + // Find the toolshed process (not grep itself) + pid = pids.find((p) => p && p.trim() !== ""); + } + + // If not found, try Deno script (local dev) + if (!pid) { + const pgrepDeno = new Deno.Command("pgrep", { + args: ["-f", "deno run --unstable-otel"], + stdout: "piped", + }); + const { stdout: denoOut, code: denoCode } = await pgrepDeno.output(); + + if (denoCode === 0 && denoOut && denoOut.length > 0) { + pid = new TextDecoder().decode(denoOut).trim().split("\n")[0]; + } + } + + if (!pid || pid.trim() === "") { + throw new Error( + "Could not find toolshed server process. Make sure the server is running.", + ); + } + + // Then get RSS using portable ps -o format (works on Linux, macOS, BSD) + const psProcess = new Deno.Command("ps", { + args: ["-p", pid, "-o", "rss="], + stdout: "piped", + }); + const { stdout: psOut } = await psProcess.output(); + const rssKB = parseInt(new TextDecoder().decode(psOut).trim()); + + return rssKB / 1024; // Convert KB to MB +} + +// Main test function +async function runTest() { + const account = await Identity.fromPassphrase(ANYONE); + const space_thingy = await account.derive(SPACE_NAME); + const space_thingy_space = space_thingy.did(); + const session = { + private: false, + name: SPACE_NAME, + space: space_thingy_space, + as: space_thingy, + } as Session; + + // Create storage manager + const storageManager = StorageManager.open({ + as: session.as, + address: new URL("/api/storage/memory", API_URL), + }); + + // Create runtime + const runtime = new Runtime({ + apiUrl: new URL(API_URL), + storageManager, + }); + + // Create charm manager for the specified space + const charmManager = new CharmManager(session, runtime); + await charmManager.ready; + + // Read the recipe file content + const recipeContent = await Deno.readTextFile( + "./integration/derive_array_leak.test.tsx", + ); + + const recipe = await compileRecipe( + recipeContent, + "recipe", + runtime, + space_thingy_space, + ); + console.log("Recipe compiled successfully"); + + const charm = (await charmManager.runPersistent(recipe, {})).asSchema({ + type: "object", + properties: { + value: { type: "number" }, + increment: { + asStream: true, + }, + }, + required: ["value", "increment"], + }); + console.log("Charm created:", charm.entityId); + + // Wait for initial state + await runtime.idle(); + await runtime.storageManager.synced(); + + // Give it 5 seconds to settle after initialization + console.log("Waiting 5 seconds for memory to settle..."); + await new Promise((resolve) => setTimeout(resolve, 5000)); + + // Measure baseline server memory (where the leak occurs) + const serverMemoryBeforeMB = await getServerMemoryMB(); + console.log(`Baseline server memory: ${serverMemoryBeforeMB.toFixed(1)} MB`); + console.log(`Initial counter value: ${charm.get().value}`); + + // Trigger the leak by incrementing + // The handler increments INCREMENTS_PER_CLICK times per click + console.log( + `Clicking increment (${INCREMENTS_PER_CLICK} increments total)...`, + ); + const incrementStream = charm.key("increment"); + incrementStream.send({}); + + // Wait for all updates to complete + console.log("Waiting for runtime to finish..."); + await runtime.idle(); + await runtime.storageManager.synced(); + console.log(`Final counter value: ${charm.get().value}`); + + // Verify the counter actually incremented + const finalValue = charm.get().value; + const expectedValue = INCREMENTS_PER_CLICK; + if (finalValue !== expectedValue) { + console.warn( + `WARNING: Counter value is ${finalValue}, expected ${expectedValue}. ` + + `This may indicate the derive action failed due to array size.`, + ); + } + + // Measure server memory after operations complete + const serverMemoryAfterMB = await getServerMemoryMB(); + const serverMemoryIncreaseMB = serverMemoryAfterMB - serverMemoryBeforeMB; + const memoryRatio = serverMemoryAfterMB / serverMemoryBeforeMB; + + console.log(`Final server memory: ${serverMemoryAfterMB.toFixed(1)} MB`); + console.log( + `Server memory increase: ${serverMemoryIncreaseMB.toFixed(1)} MB (${ + ((memoryRatio - 1) * 100).toFixed(1) + }% increase)`, + ); + console.log(`Memory ratio: ${memoryRatio.toFixed(2)}x`); + + // Clean up + await runtime.dispose(); + await storageManager.close(); + + // Check if server memory increase indicates a leak + if (memoryRatio > MAX_MEMORY_INCREASE_RATIO) { + console.error( + `FAIL: Server memory increased to ${memoryRatio.toFixed(2)}x baseline, ` + + `exceeds limit of ${MAX_MEMORY_INCREASE_RATIO}x`, + ); + console.error("This indicates a memory leak is present"); + throw new Error( + `Memory leak detected: ${ + memoryRatio.toFixed(2) + }x increase (limit: ${MAX_MEMORY_INCREASE_RATIO}x)`, + ); + } + + console.log( + `PASS: Memory increase ${ + memoryRatio.toFixed(2) + }x is within acceptable limit (< ${MAX_MEMORY_INCREASE_RATIO}x)`, + ); + console.log(`Counter reached ${finalValue} (expected ${expectedValue})`); +} + +// Run the test with timeout +Deno.test({ + name: "derive array leak test", + fn: async () => { + let timeoutHandle: number; + const timeoutPromise = new Promise((_, reject) => { + timeoutHandle = setTimeout(() => { + reject(new Error(`Test timed out after ${TIMEOUT_MS}ms`)); + }, TIMEOUT_MS); + }); + + try { + await Promise.race([runTest(), timeoutPromise]); + console.log("Test completed successfully"); + } finally { + clearTimeout(timeoutHandle!); + } + }, + sanitizeResources: false, + sanitizeOps: false, +}); diff --git a/packages/runner/integration/derive_array_leak.test.tsx b/packages/runner/integration/derive_array_leak.test.tsx new file mode 100644 index 000000000..34361d4a4 --- /dev/null +++ b/packages/runner/integration/derive_array_leak.test.tsx @@ -0,0 +1,90 @@ +/// +import { + Cell, + Default, + derive, + handler, + NAME, + recipe, + str, + Stream, + UI, +} from "commontools"; + +// How many times to increment per click +const INCREMENTS_PER_CLICK = 50; + +interface RecipeState { + value: Default; +} + +interface RecipeOutput { + value: Default; + increment: Stream; + decrement: Stream; +} + +// Inline handlers to avoid import resolution issues +const increment = handler< + unknown, + { value: Cell } +>( + (_args, state) => { + // Increment multiple times per click to trigger derive() multiple times + for (let i = 0; i < INCREMENTS_PER_CLICK; i++) { + state.value.set(state.value.get() + 1); + } + }, +); + +const decrement = handler< + unknown, + { value: Cell } +>( + (_args, state) => { + state.value.set(state.value.get() - 1); + }, +); + +function nth(value: number) { + if (value === 1) return "1st"; + if (value === 2) return "2nd"; + if (value === 3) return "3rd"; + return `${value}th`; +} + +function previous(value: number) { + return value - 1; +} + +export default recipe("Counter", (state) => { + const array = derive(state.value, (value: number) => { + // Clamp to prevent negative array length + const length = Math.max(0, value); + return new Array(length).fill(0); + }); + return { + [NAME]: str`Simple counter: ${state.value}`, + [UI]: ( +
+
+ + dec to {previous(state.value)} + + + Counter is the {nth(state.value)} number + + + inc to {state.value + 1} + +
+
+ {array.map((v: number, i: number) => {v % 10})} +
+
+ ), + value: state.value, + increment: increment(state) as unknown as Stream, + decrement: decrement(state) as unknown as Stream, + }; +}); diff --git a/packages/runner/integration/pending-nursery.test.ts b/packages/runner/integration/pending-nursery.test.ts index 664b28065..a391a5a58 100644 --- a/packages/runner/integration/pending-nursery.test.ts +++ b/packages/runner/integration/pending-nursery.test.ts @@ -17,6 +17,8 @@ const identity = await Identity.fromPassphrase("test operator", keyConfig); console.log("\n=== TEST: Simple object persistence ==="); +const TIMEOUT_MS = 180000; // 3 minutes timeout + async function test() { // First runtime - save data const runtime1 = new Runtime({ @@ -141,10 +143,31 @@ async function test() { await runtime2.dispose(); } -for (let i: number = 1; i <= 20; i++) { - await test(); - console.log("completed", i, "..."); +async function runTest() { + for (let i: number = 1; i <= 20; i++) { + await test(); + console.log("completed", i, "..."); + } + + console.log("\nDone"); } -console.log("\nDone"); -Deno.exit(0); +Deno.test({ + name: "pending nursery test", + fn: async () => { + let timeoutHandle: number; + const timeoutPromise = new Promise((_, reject) => { + timeoutHandle = setTimeout(() => { + reject(new Error(`Test timed out after ${TIMEOUT_MS}ms`)); + }, TIMEOUT_MS); + }); + + try { + await Promise.race([runTest(), timeoutPromise]); + } finally { + clearTimeout(timeoutHandle!); + } + }, + sanitizeResources: false, + sanitizeOps: false, +}); diff --git a/packages/runner/integration/reconnection.test.ts b/packages/runner/integration/reconnection.test.ts index 5c7888860..698c65069 100644 --- a/packages/runner/integration/reconnection.test.ts +++ b/packages/runner/integration/reconnection.test.ts @@ -13,230 +13,256 @@ const { API_URL } = env; const MEMORY_WS_URL = `${ API_URL.replace("http://", "ws://") -}/api/storage/memory`; +}api/storage/memory`; const TEST_DOC_ID = "test-reconnection-counter"; -console.log("Schema Query Reconnection Integration Test"); -console.log(`Connecting to: ${MEMORY_WS_URL}`); - -// Create test identity -const signer = await Identity.fromPassphrase("test operator"); - -// Create storage manager -const storageManager1 = StorageManager.open({ - as: signer, - address: new URL(MEMORY_WS_URL), - id: "provider1-reconnect-test", -}); - -// Open provider -const provider1 = storageManager1.open(signer.did()); -console.log(`Connected to memory server`); - -// Define test schema -const testSchemaContext: SchemaContext = { - schema: { - type: "object", - properties: { - value: { type: "number" }, - timestamp: { type: "string" }, - }, - required: ["value"], - }, - rootSchema: { - type: "object", - properties: { - value: { type: "number" }, - timestamp: { type: "string" }, - }, - required: ["value"], - }, -}; -const testSelector = { path: [], schemaContext: testSchemaContext }; - -interface UpdateValue { - value: number; - timestamp: string; -} - -// Track updates for each provider -let updateCount1 = 0; -const updates1: UpdateValue[] = []; -let updateCount3 = 0; -const updates3: UpdateValue[] = []; - -// Create a third provider that stays connected (control) -const storageManager3 = StorageManager.open({ - as: signer, - address: new URL(MEMORY_WS_URL), - id: "provider3-control-test", -}); -const provider3 = storageManager3.open(signer.did()); -console.log(`Provider3 (control) connected to memory server`); -const uri: URI = `of:${TEST_DOC_ID}`; -// Listen for updates on the test-reconnection-counter document -// Note: this is not the schema subscription, its just a client-side listener -provider1.sink(uri, (value) => { - updateCount1++; - updates1.push(value.value); - console.log(`Provider1 Update #${updateCount1}:`, value.value); -}); - -provider3.sink(uri, (value) => { - updateCount3++; - updates3.push(value.value); - console.log(`Provider3 Update #${updateCount3}:`, value.value); -}); - -// Establish server-side subscription with schema -console.log("Establishing subscriptions..."); -await provider1.sync(uri, testSelector); -await provider3.sync(uri, testSelector); - -// Send initial value to server -console.log("Sending initial value..."); -await provider1.send([{ - uri, - value: { - value: { - value: 1, - timestamp: new Date().toISOString(), - }, - }, -}]); - -// Wait to give server time to send us back the update -await new Promise((resolve) => setTimeout(resolve, 1000)); - -// Check if our listeners were called via the subscription -if (updateCount1 === 0 || updateCount3 === 0) { - console.error( - `FAILED: No initial update received. Provider1: ${updateCount1}, Provider3: ${updateCount3}`, - ); - Deno.exit(1); -} - -console.log("Initial updates received by both providers"); - -// Test reconnection behavior -console.log("\nTesting reconnection behavior..."); - -// Access the WebSocket connection -- it's private so we use any -const providerSocket = (provider1 as any).connection as WebSocket | undefined; -console.log("WebSocket state:", providerSocket?.readyState); - -// Force disconnect the WebSocket -console.log("Forcing WebSocket disconnection..."); -if (providerSocket) { - providerSocket.close(); - console.log("WebSocket closed"); -} else { - console.error("No WebSocket connection found"); - Deno.exit(1); -} - -// Monitor reconnection and updates -let testValue = 100; // Use values over 100 to show the value happens after reconnection -const preDisconnectCount1 = updateCount1; -const preDisconnectCount3 = updateCount3; - -// Give it a moment to reconnect -await new Promise((resolve) => setTimeout(resolve, 10000)); - -// Create a second storage manager and provider -const storageManager2 = StorageManager.open({ - as: signer, - address: new URL(MEMORY_WS_URL), - id: "provider2-reconnect-test", -}); - -// Open provider -const provider2 = storageManager2.open(signer.did()); -console.log(`Connected to memory server as second client`); - -// Establish server-side subscription with schema -console.log("Establishing subscription as second client..."); -await provider2.sync(uri, testSelector); +Deno.test({ + name: "schema query reconnection test", + ignore: true, + fn: async () => { + console.log("Schema Query Reconnection Integration Test"); + console.log(`Connecting to: ${MEMORY_WS_URL}`); + + // Create test identity + const signer = await Identity.fromPassphrase("test operator"); + + // Create storage manager + const storageManager1 = StorageManager.open({ + as: signer, + address: new URL(MEMORY_WS_URL), + id: "provider1-reconnect-test", + }); + + // Open provider + const provider1 = storageManager1.open(signer.did()); + console.log(`Connected to memory server`); + + // Define test schema + const testSchemaContext: SchemaContext = { + schema: { + type: "object", + properties: { + value: { type: "number" }, + timestamp: { type: "string" }, + }, + required: ["value"], + }, + rootSchema: { + type: "object", + properties: { + value: { type: "number" }, + timestamp: { type: "string" }, + }, + required: ["value"], + }, + }; + const testSelector = { path: [], schemaContext: testSchemaContext }; -// Send test updates and check if subscription still works -console.log("Sending test updates after disconnection..."); + interface UpdateValue { + value: number; + timestamp: string; + } -const intervalId = setInterval(async () => { - try { - // Send an update as the second - const result = await provider2.send([{ + // Track updates for each provider + let updateCount1 = 0; + const updates1: UpdateValue[] = []; + let updateCount3 = 0; + const updates3: UpdateValue[] = []; + + // Create a third provider that stays connected (control) + const storageManager3 = StorageManager.open({ + as: signer, + address: new URL(MEMORY_WS_URL), + id: "provider3-control-test", + }); + const provider3 = storageManager3.open(signer.did()); + console.log(`Provider3 (control) connected to memory server`); + const uri: URI = `of:${TEST_DOC_ID}`; + // Listen for updates on the test-reconnection-counter document + // Note: this is not the schema subscription, its just a client-side listener + provider1.sink(uri, (value) => { + updateCount1++; + updates1.push(value.value); + console.log(`Provider1 Update #${updateCount1}:`, value.value); + }); + + provider3.sink(uri, (value) => { + updateCount3++; + updates3.push(value.value); + console.log(`Provider3 Update #${updateCount3}:`, value.value); + }); + + // Establish server-side subscription with schema + console.log("Establishing subscriptions..."); + await provider1.sync(uri, testSelector); + await provider3.sync(uri, testSelector); + + // Send initial value to server + console.log("Sending initial value..."); + await provider1.send([{ uri, value: { value: { - value: testValue++, + value: 1, timestamp: new Date().toISOString(), }, }, }]); - console.log(result); - - // Check if we've received updates with value >= 100 (post-reconnection) - const postReconnectUpdates1 = updates1.filter((u) => u.value >= 100); - const postReconnectUpdates3 = updates3.filter((u) => u.value >= 100); - - console.log( - `Status - Provider1 post-reconnect updates: ${postReconnectUpdates1.length}, Provider3: ${postReconnectUpdates3.length}`, - ); - if ( - postReconnectUpdates1.length >= 3 && postReconnectUpdates3.length >= 3 - ) { - console.log( - "SUCCESS: Both providers received updates after reconnection!", - ); - console.log( - `Provider1 - Total: ${updateCount1}, Pre-disconnect: ${preDisconnectCount1}, Post-reconnect: ${postReconnectUpdates1.length}`, - ); - console.log( - `Provider3 - Total: ${updateCount3}, Pre-disconnect: ${preDisconnectCount3}, Post-reconnect: ${postReconnectUpdates3.length}`, - ); + // Wait to give server time to send us back the update + await new Promise((resolve) => setTimeout(resolve, 1000)); - clearInterval(intervalId); - storageManager1.close(); - storageManager2.close(); - storageManager3.close(); - Deno.exit(0); - } else if ( - postReconnectUpdates3.length >= 3 && postReconnectUpdates1.length === 0 - ) { - console.log( - `Provider1 - Total: ${updateCount1}, Pre-disconnect: ${preDisconnectCount1}, Post-reconnect: ${postReconnectUpdates1.length}`, + // Check if our listeners were called via the subscription + if (updateCount1 === 0 || updateCount3 === 0) { + console.error( + `FAILED: No initial update received. Provider1: ${updateCount1}, Provider3: ${updateCount3}`, ); - console.log( - `Provider3 - Total: ${updateCount3}, Pre-disconnect: ${preDisconnectCount3}, Post-reconnect: ${postReconnectUpdates3.length}`, + throw new Error( + `No initial update received. Provider1: ${updateCount1}, Provider3: ${updateCount3}`, ); + } - clearInterval(intervalId); - storageManager1.close(); - storageManager2.close(); - storageManager3.close(); - Deno.exit(1); + console.log("Initial updates received by both providers"); + + // Test reconnection behavior + console.log("\nTesting reconnection behavior..."); + + // Access the WebSocket connection -- it's private so we use any + const providerSocket = (provider1 as any).connection as + | WebSocket + | undefined; + console.log("WebSocket state:", providerSocket?.readyState); + + // Force disconnect the WebSocket + console.log("Forcing WebSocket disconnection..."); + if (providerSocket) { + providerSocket.close(); + console.log("WebSocket closed"); + } else { + console.error("No WebSocket connection found"); + throw new Error("No WebSocket connection found"); } - } catch (error) { - console.log( - "Error sending update:", - error instanceof Error ? error.message : String(error), - ); - } -}, 1000); - -// Timeout after 30 seconds -setTimeout(() => { - console.error("TIMEOUT: Test did not complete within 30 seconds"); - console.log( - `Final status - Provider1: ${updateCount1} updates, Provider3: ${updateCount3} updates`, - ); - clearInterval(intervalId); - storageManager1.close(); - storageManager2.close(); - storageManager3.close(); - Deno.exit(1); -}, 30000); - -// Keep the process running -await new Promise(() => {}); + + // Monitor reconnection and updates + let testValue = 100; // Use values over 100 to show the value happens after reconnection + const preDisconnectCount1 = updateCount1; + const preDisconnectCount3 = updateCount3; + + // Give it a moment to reconnect + await new Promise((resolve) => setTimeout(resolve, 10000)); + + // Create a second storage manager and provider + const storageManager2 = StorageManager.open({ + as: signer, + address: new URL(MEMORY_WS_URL), + id: "provider2-reconnect-test", + }); + + // Open provider + const provider2 = storageManager2.open(signer.did()); + console.log(`Connected to memory server as second client`); + + // Establish server-side subscription with schema + console.log("Establishing subscription as second client..."); + await provider2.sync(uri, testSelector); + + // Send test updates and check if subscription still works + console.log("Sending test updates after disconnection..."); + + // Create a promise that resolves/rejects when test completes + await new Promise((resolve, reject) => { + let hasResolved = false; + + const cleanup = () => { + if (!hasResolved) { + hasResolved = true; + clearInterval(intervalId); + clearTimeout(timeoutId); + storageManager1.close(); + storageManager2.close(); + storageManager3.close(); + } + }; + + const intervalId = setInterval(async () => { + if (hasResolved) return; // Don't continue if already resolved + + try { + // Send an update as the second + const result = await provider2.send([{ + uri, + value: { + value: { + value: testValue++, + timestamp: new Date().toISOString(), + }, + }, + }]); + console.log(result); + + // Check if we've received updates with value >= 100 (post-reconnection) + const postReconnectUpdates1 = updates1.filter((u) => u.value >= 100); + const postReconnectUpdates3 = updates3.filter((u) => u.value >= 100); + + console.log( + `Status - Provider1 post-reconnect updates: ${postReconnectUpdates1.length}, Provider3: ${postReconnectUpdates3.length}`, + ); + + if ( + postReconnectUpdates1.length >= 3 && + postReconnectUpdates3.length >= 3 + ) { + console.log( + "SUCCESS: Both providers received updates after reconnection!", + ); + console.log( + `Provider1 - Total: ${updateCount1}, Pre-disconnect: ${preDisconnectCount1}, Post-reconnect: ${postReconnectUpdates1.length}`, + ); + console.log( + `Provider3 - Total: ${updateCount3}, Pre-disconnect: ${preDisconnectCount3}, Post-reconnect: ${postReconnectUpdates3.length}`, + ); + + cleanup(); + resolve(); // Test passed + } else if ( + postReconnectUpdates3.length >= 3 && + postReconnectUpdates1.length === 0 + ) { + console.log( + `Provider1 - Total: ${updateCount1}, Pre-disconnect: ${preDisconnectCount1}, Post-reconnect: ${postReconnectUpdates1.length}`, + ); + console.log( + `Provider3 - Total: ${updateCount3}, Pre-disconnect: ${preDisconnectCount3}, Post-reconnect: ${postReconnectUpdates3.length}`, + ); + + cleanup(); + reject( + new Error("Provider1 did not receive updates after reconnection"), + ); + } + } catch (error) { + // If there's an error, fail the test instead of continuing + console.error( + "FATAL ERROR in interval:", + error instanceof Error ? error.message : String(error), + error instanceof Error ? error.stack : "", + ); + cleanup(); + reject(error instanceof Error ? error : new Error(String(error))); + } + }, 1000); + + // Timeout after 30 seconds + const timeoutId = setTimeout(() => { + console.error("TIMEOUT: Test did not complete within 30 seconds"); + console.log( + `Final status - Provider1: ${updateCount1} updates, Provider3: ${updateCount3} updates`, + ); + cleanup(); + reject(new Error("Test did not complete within 30 seconds")); + }, 30000); + }); + }, + sanitizeResources: false, + sanitizeOps: false, +}); diff --git a/packages/runner/integration/sync-schema-path.test.ts b/packages/runner/integration/sync-schema-path.test.ts index 1a5b5e223..e793d59c4 100644 --- a/packages/runner/integration/sync-schema-path.test.ts +++ b/packages/runner/integration/sync-schema-path.test.ts @@ -17,6 +17,8 @@ const identity = await Identity.fromPassphrase("test operator", keyConfig); console.log("\n=== TEST: Sync Schema uses Path ==="); +const TIMEOUT_MS = 180000; // 3 minutes timeout + async function test() { // First runtime - save data const runtime1 = new Runtime({ @@ -127,7 +129,27 @@ async function test() { await runtime2.dispose(); } -await test(); +async function runTest() { + await test(); + console.log("\nDone"); +} + +Deno.test({ + name: "sync schema path test", + fn: async () => { + let timeoutHandle: number; + const timeoutPromise = new Promise((_, reject) => { + timeoutHandle = setTimeout(() => { + reject(new Error(`Test timed out after ${TIMEOUT_MS}ms`)); + }, TIMEOUT_MS); + }); -console.log("\nDone"); -Deno.exit(0); + try { + await Promise.race([runTest(), timeoutPromise]); + } finally { + clearTimeout(timeoutHandle!); + } + }, + sanitizeResources: false, + sanitizeOps: false, +}); diff --git a/packages/runner/integration/traverse_timing.test.ts b/packages/runner/integration/traverse_timing.test.ts index a4b09688e..019358e52 100644 --- a/packages/runner/integration/traverse_timing.test.ts +++ b/packages/runner/integration/traverse_timing.test.ts @@ -180,19 +180,25 @@ function timeFunction(name: string, fn: () => void) { console.log(name, "took", end - start, "ms"); } -timeFunction("initTest", () => { - initTest(objectManager, data); -}); +Deno.test({ + name: "traverse timing test", + fn: () => { + timeFunction("initTest", () => { + initTest(objectManager, data); + }); + + const n = 100; + timeFunction(`traverseLoop${n}`, () => { + for (let i = 0; i < n; i++) { + runTest(objectManager); + if (i === 0) { + console.log("missing docs:", objectManager.getMissingDocs()); + } + } + }); -const n = 100; -timeFunction(`traverseLoop${n}`, () => { - for (let i = 0; i < n; i++) { - runTest(objectManager); - if (i === 0) { - console.log("missing docs:", objectManager.getMissingDocs()); - } - } + console.log("\nDone"); + }, + sanitizeResources: false, + sanitizeOps: false, }); - -console.log("\nDone"); -Deno.exit(0); diff --git a/scripts/run_reconnection_test.sh b/scripts/run_reconnection_test.sh new file mode 100755 index 000000000..755d27238 --- /dev/null +++ b/scripts/run_reconnection_test.sh @@ -0,0 +1,19 @@ +#!/usr/bin/env bash + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" + +echo "Stopping dev servers..." +"$SCRIPT_DIR/stop-local-dev.sh" + +echo "Removing cached databases..." +rm -f "$REPO_ROOT/packages/toolshed/cache/memory/did:key:z6Mk"*.sqlite + +echo "Starting dev servers..." +"$SCRIPT_DIR/start-local-dev.sh" + +echo "Running reconnection test..." +cd "$REPO_ROOT/packages/runner" +deno test --allow-all ./integration/reconnection.test.ts