Skip to content

Commit 03d2636

Browse files
fix: decouple stream listeners from stream entry lifecycle (op7418#183)
fix: decouple stream listeners from stream entry lifecycle
1 parent 94fdd36 commit 03d2636

File tree

1 file changed

+25
-45
lines changed

1 file changed

+25
-45
lines changed

src/lib/stream-session-manager.ts

Lines changed: 25 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ interface ActiveStream {
3131
sessionId: string;
3232
abortController: AbortController;
3333
snapshot: SessionStreamSnapshot;
34-
listeners: Set<StreamEventListener>;
3534
idleCheckTimer: ReturnType<typeof setInterval> | null;
3635
lastEventTime: number;
3736
gcTimer: ReturnType<typeof setTimeout> | null;
@@ -67,6 +66,7 @@ export interface StartStreamParams {
6766
// ==========================================
6867

6968
const GLOBAL_KEY = '__streamSessionManager__' as const;
69+
const LISTENERS_KEY = '__streamSessionListeners__' as const;
7070
const STREAM_IDLE_TIMEOUT_MS = 330_000;
7171
const GC_DELAY_MS = 5 * 60 * 1000; // 5 minutes
7272

@@ -77,6 +77,14 @@ function getStreamsMap(): Map<string, ActiveStream> {
7777
return (globalThis as Record<string, unknown>)[GLOBAL_KEY] as Map<string, ActiveStream>;
7878
}
7979

80+
/** Listener registry — persists independently of stream entries so GC doesn't orphan listeners */
81+
function getListenersMap(): Map<string, Set<StreamEventListener>> {
82+
if (!(globalThis as Record<string, unknown>)[LISTENERS_KEY]) {
83+
(globalThis as Record<string, unknown>)[LISTENERS_KEY] = new Map<string, Set<StreamEventListener>>();
84+
}
85+
return (globalThis as Record<string, unknown>)[LISTENERS_KEY] as Map<string, Set<StreamEventListener>>;
86+
}
87+
8088
// ==========================================
8189
// Helpers
8290
// ==========================================
@@ -104,8 +112,11 @@ function emit(stream: ActiveStream, type: StreamEvent['type']) {
104112
const snapshot = buildSnapshot(stream);
105113
stream.snapshot = snapshot; // store latest
106114
const event: StreamEvent = { type, sessionId: stream.sessionId, snapshot };
107-
for (const listener of stream.listeners) {
108-
try { listener(event); } catch { /* listener error */ }
115+
const listeners = getListenersMap().get(stream.sessionId);
116+
if (listeners) {
117+
for (const listener of listeners) {
118+
try { listener(event); } catch { /* listener error */ }
119+
}
109120
}
110121
// Also dispatch window event for AppShell
111122
if (typeof window !== 'undefined') {
@@ -166,7 +177,6 @@ export function startStream(params: StartStreamParams): void {
166177
error: null,
167178
finalMessageContent: null,
168179
},
169-
listeners: existing?.listeners ?? new Set(),
170180
idleCheckTimer: null,
171181
lastEventTime: Date.now(),
172182
gcTimer: null,
@@ -490,49 +500,19 @@ export function stopStream(sessionId: string): void {
490500
// ==========================================
491501

492502
export function subscribe(sessionId: string, listener: StreamEventListener): () => void {
493-
const map = getStreamsMap();
494-
let stream = map.get(sessionId);
495-
496-
if (!stream) {
497-
// Create a placeholder entry to hold listeners even when no stream is active
498-
stream = {
499-
sessionId,
500-
abortController: new AbortController(),
501-
snapshot: {
502-
sessionId,
503-
phase: 'completed' as const,
504-
streamingContent: '',
505-
toolUses: [],
506-
toolResults: [],
507-
streamingToolOutput: '',
508-
statusText: undefined,
509-
pendingPermission: null,
510-
permissionResolved: null,
511-
tokenUsage: null,
512-
startedAt: 0,
513-
completedAt: null,
514-
error: null,
515-
finalMessageContent: null,
516-
},
517-
listeners: new Set(),
518-
idleCheckTimer: null,
519-
lastEventTime: 0,
520-
gcTimer: null,
521-
accumulatedText: '',
522-
toolUsesArray: [],
523-
toolResultsArray: [],
524-
toolOutputAccumulated: '',
525-
toolTimeoutInfo: null,
526-
isIdleTimeout: false,
527-
sendMessageFn: null,
528-
};
529-
map.set(sessionId, stream);
503+
const listenersMap = getListenersMap();
504+
let listeners = listenersMap.get(sessionId);
505+
if (!listeners) {
506+
listeners = new Set();
507+
listenersMap.set(sessionId, listeners);
530508
}
531-
532-
stream.listeners.add(listener);
509+
listeners.add(listener);
533510

534511
return () => {
535-
stream!.listeners.delete(listener);
512+
listeners!.delete(listener);
513+
if (listeners!.size === 0) {
514+
listenersMap.delete(sessionId);
515+
}
536516
};
537517
}
538518

@@ -630,7 +610,7 @@ export function clearSnapshot(sessionId: string): void {
630610
const stream = getStreamsMap().get(sessionId);
631611
if (stream && stream.snapshot.phase !== 'active') {
632612
if (stream.gcTimer) clearTimeout(stream.gcTimer);
633-
// Keep the listeners entry but reset the snapshot
613+
// Reset the snapshot (listeners are in a separate registry)
634614
stream.snapshot = {
635615
...stream.snapshot,
636616
startedAt: 0,

0 commit comments

Comments
 (0)