forked from ultraworkers/claw-code
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsessionIngress.ts
More file actions
514 lines (466 loc) · 16.7 KB
/
Copy pathsessionIngress.ts
File metadata and controls
514 lines (466 loc) · 16.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
import axios, { type AxiosError } from 'axios'
import type { UUID } from 'crypto'
import { getOauthConfig } from '../../constants/oauth.js'
import type { Entry, TranscriptMessage } from '../../types/logs.js'
import { logForDebugging } from '../../utils/debug.js'
import { logForDiagnosticsNoPII } from '../../utils/diagLogs.js'
import { isEnvTruthy } from '../../utils/envUtils.js'
import { logError } from '../../utils/log.js'
import { sequential } from '../../utils/sequential.js'
import { getSessionIngressAuthToken } from '../../utils/sessionIngressAuth.js'
import { sleep } from '../../utils/sleep.js'
import { jsonStringify } from '../../utils/slowOperations.js'
import { getOAuthHeaders } from '../../utils/teleport/api.js'
interface SessionIngressError {
error?: {
message?: string
type?: string
}
}
// Module-level state
const lastUuidMap: Map<string, UUID> = new Map()
const MAX_RETRIES = 10
const BASE_DELAY_MS = 500
// Per-session sequential wrappers to prevent concurrent log writes
const sequentialAppendBySession: Map<
string,
(
entry: TranscriptMessage,
url: string,
headers: Record<string, string>,
) => Promise<boolean>
> = new Map()
/**
* Gets or creates a sequential wrapper for a session
* This ensures that log appends for a session are processed one at a time
*/
function getOrCreateSequentialAppend(sessionId: string) {
let sequentialAppend = sequentialAppendBySession.get(sessionId)
if (!sequentialAppend) {
sequentialAppend = sequential(
async (
entry: TranscriptMessage,
url: string,
headers: Record<string, string>,
) => await appendSessionLogImpl(sessionId, entry, url, headers),
)
sequentialAppendBySession.set(sessionId, sequentialAppend)
}
return sequentialAppend
}
/**
* Internal implementation of appendSessionLog with retry logic
* Retries on transient errors (network, 5xx, 429). On 409, adopts the server's
* last UUID and retries (handles stale state from killed process's in-flight
* requests). Fails immediately on 401.
*/
async function appendSessionLogImpl(
sessionId: string,
entry: TranscriptMessage,
url: string,
headers: Record<string, string>,
): Promise<boolean> {
for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) {
try {
const lastUuid = lastUuidMap.get(sessionId)
const requestHeaders = { ...headers }
if (lastUuid) {
requestHeaders['Last-Uuid'] = lastUuid
}
const response = await axios.put(url, entry, {
headers: requestHeaders,
validateStatus: status => status < 500,
})
if (response.status === 200 || response.status === 201) {
lastUuidMap.set(sessionId, entry.uuid)
logForDebugging(
`Successfully persisted session log entry for session ${sessionId}`,
)
return true
}
if (response.status === 409) {
// Check if our entry was actually stored (server returned 409 but entry exists)
// This handles the scenario where entry was stored but client received an error
// response, causing lastUuidMap to be stale
const serverLastUuid = response.headers['x-last-uuid']
if (serverLastUuid === entry.uuid) {
// Our entry IS the last entry on server - it was stored successfully previously
lastUuidMap.set(sessionId, entry.uuid)
logForDebugging(
`Session entry ${entry.uuid} already present on server, recovering from stale state`,
)
logForDiagnosticsNoPII('info', 'session_persist_recovered_from_409')
return true
}
// Another writer (e.g. in-flight request from a killed process)
// advanced the server's chain. Try to adopt the server's last UUID
// from the response header, or re-fetch the session to discover it.
if (serverLastUuid) {
lastUuidMap.set(sessionId, serverLastUuid as UUID)
logForDebugging(
`Session 409: adopting server lastUuid=${serverLastUuid} from header, retrying entry ${entry.uuid}`,
)
} else {
// Server didn't return x-last-uuid (e.g. v1 endpoint). Re-fetch
// the session to discover the current head of the append chain.
const logs = await fetchSessionLogsFromUrl(sessionId, url, headers)
const adoptedUuid = findLastUuid(logs)
if (adoptedUuid) {
lastUuidMap.set(sessionId, adoptedUuid)
logForDebugging(
`Session 409: re-fetched ${logs!.length} entries, adopting lastUuid=${adoptedUuid}, retrying entry ${entry.uuid}`,
)
} else {
// Can't determine server state — give up
const errorData = response.data as SessionIngressError
const errorMessage =
errorData.error?.message || 'Concurrent modification detected'
logError(
new Error(
`Session persistence conflict: UUID mismatch for session ${sessionId}, entry ${entry.uuid}. ${errorMessage}`,
),
)
logForDiagnosticsNoPII(
'error',
'session_persist_fail_concurrent_modification',
)
return false
}
}
logForDiagnosticsNoPII('info', 'session_persist_409_adopt_server_uuid')
continue // retry with updated lastUuid
}
if (response.status === 401) {
logForDebugging('Session token expired or invalid')
logForDiagnosticsNoPII('error', 'session_persist_fail_bad_token')
return false // Non-retryable
}
// Other 4xx (429, etc.) - retryable
logForDebugging(
`Failed to persist session log: ${response.status} ${response.statusText}`,
)
logForDiagnosticsNoPII('error', 'session_persist_fail_status', {
status: response.status,
attempt,
})
} catch (error) {
// Network errors, 5xx - retryable
const axiosError = error as AxiosError<SessionIngressError>
logError(new Error(`Error persisting session log: ${axiosError.message}`))
logForDiagnosticsNoPII('error', 'session_persist_fail_status', {
status: axiosError.status,
attempt,
})
}
if (attempt === MAX_RETRIES) {
logForDebugging(`Remote persistence failed after ${MAX_RETRIES} attempts`)
logForDiagnosticsNoPII(
'error',
'session_persist_error_retries_exhausted',
{ attempt },
)
return false
}
const delayMs = Math.min(BASE_DELAY_MS * Math.pow(2, attempt - 1), 8000)
logForDebugging(
`Remote persistence attempt ${attempt}/${MAX_RETRIES} failed, retrying in ${delayMs}ms…`,
)
await sleep(delayMs)
}
return false
}
/**
* Append a log entry to the session using JWT token
* Uses optimistic concurrency control with Last-Uuid header
* Ensures sequential execution per session to prevent race conditions
*/
export async function appendSessionLog(
sessionId: string,
entry: TranscriptMessage,
url: string,
): Promise<boolean> {
const sessionToken = getSessionIngressAuthToken()
if (!sessionToken) {
logForDebugging('No session token available for session persistence')
logForDiagnosticsNoPII('error', 'session_persist_fail_jwt_no_token')
return false
}
const headers: Record<string, string> = {
Authorization: `Bearer ${sessionToken}`,
'Content-Type': 'application/json',
}
const sequentialAppend = getOrCreateSequentialAppend(sessionId)
return sequentialAppend(entry, url, headers)
}
/**
* Get all session logs for hydration
*/
export async function getSessionLogs(
sessionId: string,
url: string,
): Promise<Entry[] | null> {
const sessionToken = getSessionIngressAuthToken()
if (!sessionToken) {
logForDebugging('No session token available for fetching session logs')
logForDiagnosticsNoPII('error', 'session_get_fail_no_token')
return null
}
const headers = { Authorization: `Bearer ${sessionToken}` }
const logs = await fetchSessionLogsFromUrl(sessionId, url, headers)
if (logs && logs.length > 0) {
// Update our lastUuid to the last entry's UUID
const lastEntry = logs.at(-1)
if (lastEntry && 'uuid' in lastEntry && lastEntry.uuid) {
lastUuidMap.set(sessionId, lastEntry.uuid)
}
}
return logs
}
/**
* Get all session logs for hydration via OAuth
* Used for teleporting sessions from the Sessions API
*/
export async function getSessionLogsViaOAuth(
sessionId: string,
accessToken: string,
orgUUID: string,
): Promise<Entry[] | null> {
const url = `${getOauthConfig().BASE_API_URL}/v1/session_ingress/session/${sessionId}`
logForDebugging(`[session-ingress] Fetching session logs from: ${url}`)
const headers = {
...getOAuthHeaders(accessToken),
'x-organization-uuid': orgUUID,
}
const result = await fetchSessionLogsFromUrl(sessionId, url, headers)
return result
}
/**
* Response shape from GET /v1/code/sessions/{id}/teleport-events.
* WorkerEvent.payload IS the Entry (TranscriptMessage struct) — the CLI
* writes it via AddWorkerEvent, the server stores it opaque, we read it
* back here.
*/
type TeleportEventsResponse = {
data: Array<{
event_id: string
event_type: string
is_compaction: boolean
payload: Entry | null
created_at: string
}>
// Unset when there are no more pages — this IS the end-of-stream
// signal (no separate has_more field).
next_cursor?: string
}
/**
* Get worker events (transcript) via the CCR v2 Sessions API. Replaces
* getSessionLogsViaOAuth once session-ingress is retired.
*
* The server dispatches per-session: Spanner for v2-native sessions,
* threadstore for pre-backfill session_* IDs. The cursor is opaque to us —
* echo it back until next_cursor is unset.
*
* Paginated (500/page default, server max 1000). session-ingress's one-shot
* 50k is gone; we loop.
*/
export async function getTeleportEvents(
sessionId: string,
accessToken: string,
orgUUID: string,
): Promise<Entry[] | null> {
const baseUrl = `${getOauthConfig().BASE_API_URL}/v1/code/sessions/${sessionId}/teleport-events`
const headers = {
...getOAuthHeaders(accessToken),
'x-organization-uuid': orgUUID,
}
logForDebugging(`[teleport] Fetching events from: ${baseUrl}`)
const all: Entry[] = []
let cursor: string | undefined
let pages = 0
// Infinite-loop guard: 1000/page × 100 pages = 100k events. Larger than
// session-ingress's 50k one-shot. If we hit this, something's wrong
// (server not advancing cursor) — bail rather than hang.
const maxPages = 100
while (pages < maxPages) {
const params: Record<string, string | number> = { limit: 1000 }
if (cursor !== undefined) {
params.cursor = cursor
}
let response
try {
response = await axios.get<TeleportEventsResponse>(baseUrl, {
headers,
params,
timeout: 20000,
validateStatus: status => status < 500,
})
} catch (e) {
const err = e as AxiosError
logError(new Error(`Teleport events fetch failed: ${err.message}`))
logForDiagnosticsNoPII('error', 'teleport_events_fetch_fail')
return null
}
if (response.status === 404) {
// 404 on page 0 is ambiguous during the migration window:
// (a) Session genuinely not found (not in Spanner AND not in
// threadstore) — nothing to fetch.
// (b) Route-level 404: endpoint not deployed yet, or session is
// a threadstore session not yet backfilled into Spanner.
// We can't tell them apart from the response alone. Returning null
// lets the caller fall back to session-ingress, which will correctly
// return empty for case (a) and data for case (b). Once the backfill
// is complete and session-ingress is gone, the fallback also returns
// null → same "Failed to fetch session logs" error as today.
//
// 404 mid-pagination (pages > 0) means session was deleted between
// pages — return what we have.
logForDebugging(
`[teleport] Session ${sessionId} not found (page ${pages})`,
)
logForDiagnosticsNoPII('warn', 'teleport_events_not_found')
return pages === 0 ? null : all
}
if (response.status === 401) {
logForDiagnosticsNoPII('error', 'teleport_events_bad_token')
throw new Error(
'Your session has expired. Please run /login to sign in again.',
)
}
if (response.status !== 200) {
logError(
new Error(
`Teleport events returned ${response.status}: ${jsonStringify(response.data)}`,
),
)
logForDiagnosticsNoPII('error', 'teleport_events_bad_status')
return null
}
const { data, next_cursor } = response.data
if (!Array.isArray(data)) {
logError(
new Error(
`Teleport events invalid response shape: ${jsonStringify(response.data)}`,
),
)
logForDiagnosticsNoPII('error', 'teleport_events_invalid_shape')
return null
}
// payload IS the Entry. null payload happens for threadstore non-generic
// events (server skips them) or encryption failures — skip here too.
for (const ev of data) {
if (ev.payload !== null) {
all.push(ev.payload)
}
}
pages++
// == null covers both `null` and `undefined` — the proto omits the
// field at end-of-stream, but some serializers emit `null`. Strict
// `=== undefined` would loop forever on `null` (cursor=null in query
// params stringifies to "null", which the server rejects or echoes).
if (next_cursor == null) {
break
}
cursor = next_cursor
}
if (pages >= maxPages) {
// Don't fail — return what we have. Better to teleport with a
// truncated transcript than not at all.
logError(
new Error(`Teleport events hit page cap (${maxPages}) for ${sessionId}`),
)
logForDiagnosticsNoPII('warn', 'teleport_events_page_cap')
}
logForDebugging(
`[teleport] Fetched ${all.length} events over ${pages} page(s) for ${sessionId}`,
)
return all
}
/**
* Shared implementation for fetching session logs from a URL
*/
async function fetchSessionLogsFromUrl(
sessionId: string,
url: string,
headers: Record<string, string>,
): Promise<Entry[] | null> {
try {
const response = await axios.get(url, {
headers,
timeout: 20000,
validateStatus: status => status < 500,
params: isEnvTruthy(process.env.CLAUDE_AFTER_LAST_COMPACT)
? { after_last_compact: true }
: undefined,
})
if (response.status === 200) {
const data = response.data
// Validate the response structure
if (!data || typeof data !== 'object' || !Array.isArray(data.loglines)) {
logError(
new Error(
`Invalid session logs response format: ${jsonStringify(data)}`,
),
)
logForDiagnosticsNoPII('error', 'session_get_fail_invalid_response')
return null
}
const logs = data.loglines as Entry[]
logForDebugging(
`Fetched ${logs.length} session logs for session ${sessionId}`,
)
return logs
}
if (response.status === 404) {
logForDebugging(`No existing logs for session ${sessionId}`)
logForDiagnosticsNoPII('warn', 'session_get_no_logs_for_session')
return []
}
if (response.status === 401) {
logForDebugging('Auth token expired or invalid')
logForDiagnosticsNoPII('error', 'session_get_fail_bad_token')
throw new Error(
'Your session has expired. Please run /login to sign in again.',
)
}
logForDebugging(
`Failed to fetch session logs: ${response.status} ${response.statusText}`,
)
logForDiagnosticsNoPII('error', 'session_get_fail_status', {
status: response.status,
})
return null
} catch (error) {
const axiosError = error as AxiosError<SessionIngressError>
logError(new Error(`Error fetching session logs: ${axiosError.message}`))
logForDiagnosticsNoPII('error', 'session_get_fail_status', {
status: axiosError.status,
})
return null
}
}
/**
* Walk backward through entries to find the last one with a uuid.
* Some entry types (SummaryMessage, TagMessage) don't have one.
*/
function findLastUuid(logs: Entry[] | null): UUID | undefined {
if (!logs) {
return undefined
}
const entry = logs.findLast(e => 'uuid' in e && e.uuid)
return entry && 'uuid' in entry ? (entry.uuid as UUID) : undefined
}
/**
* Clear cached state for a session
*/
export function clearSession(sessionId: string): void {
lastUuidMap.delete(sessionId)
sequentialAppendBySession.delete(sessionId)
}
/**
* Clear all cached session state (all sessions).
* Use this on /clear to free sub-agent session entries.
*/
export function clearAllSessions(): void {
lastUuidMap.clear()
sequentialAppendBySession.clear()
}