Skip to content

Commit b7378ad

Browse files
Merge pull request nestjs#8740 from wSedlacek/feat/handle-observable-stream-errors
feat(core): handle errors in observable streams
2 parents f704a43 + b7f3680 commit b7378ad

3 files changed

Lines changed: 113 additions & 19 deletions

File tree

packages/core/router/router-execution-context.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,7 @@ export class RouterExecutionContext {
431431
}
432432
const isSseHandler = !!this.reflectSse(callback);
433433
if (isSseHandler) {
434-
return async <
434+
return <
435435
TResult extends Observable<unknown> = any,
436436
TResponse extends HeaderStream = any,
437437
TRequest extends IncomingMessage = any,
@@ -440,7 +440,7 @@ export class RouterExecutionContext {
440440
res: TResponse,
441441
req: TRequest,
442442
) => {
443-
await this.responseController.sse(
443+
this.responseController.sse(
444444
result,
445445
(res as any).raw || res,
446446
(req as any).raw || req,

packages/core/router/router-response-controller.ts

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
1-
import { HttpServer, HttpStatus, RequestMethod } from '@nestjs/common';
1+
import {
2+
HttpServer,
3+
HttpStatus,
4+
Logger,
5+
RequestMethod,
6+
MessageEvent,
7+
} from '@nestjs/common';
28
import { isFunction, isObject } from '@nestjs/common/utils/shared.utils';
39
import { IncomingMessage } from 'http';
4-
import { lastValueFrom, Observable } from 'rxjs';
5-
import { debounce } from 'rxjs/operators';
10+
import { EMPTY, lastValueFrom, Observable } from 'rxjs';
11+
import { catchError, debounce, map } from 'rxjs/operators';
612
import {
713
AdditionalHeaders,
814
WritableHeaderStream,
@@ -20,6 +26,8 @@ export interface RedirectResponse {
2026
}
2127

2228
export class RouterResponseController {
29+
private readonly logger = new Logger(RouterResponseController.name);
30+
2331
constructor(private readonly applicationRef: HttpServer) {}
2432

2533
public async apply<TInput = any, TResponse = any>(
@@ -87,7 +95,7 @@ export class RouterResponseController {
8795
this.applicationRef.status(response, statusCode);
8896
}
8997

90-
public async sse<
98+
public sse<
9199
TInput extends Observable<unknown> = any,
92100
TResponse extends WritableHeaderStream = any,
93101
TRequest extends IncomingMessage = any,
@@ -109,15 +117,29 @@ export class RouterResponseController {
109117

110118
const subscription = result
111119
.pipe(
120+
map((message): MessageEvent => {
121+
if (isObject(message)) {
122+
return message as MessageEvent;
123+
}
124+
125+
return { data: message as object | string };
126+
}),
112127
debounce(
113-
(message: any) =>
114-
new Promise(resolve => {
115-
if (!isObject(message)) {
116-
message = { data: message };
117-
}
118-
stream.writeMessage(message, resolve);
119-
}),
128+
message =>
129+
new Promise<void>(resolve =>
130+
stream.writeMessage(message, () => resolve()),
131+
),
120132
),
133+
catchError(err => {
134+
const data = err instanceof Error ? err.message : err;
135+
stream.writeMessage({ type: 'error', data }, writeError => {
136+
if (writeError) {
137+
this.logger.error(writeError);
138+
}
139+
});
140+
141+
return EMPTY;
142+
}),
121143
)
122144
.subscribe({
123145
complete: () => {

packages/core/test/router/router-response-controller.spec.ts

Lines changed: 78 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { isNil, isObject } from '@nestjs/common/utils/shared.utils';
22
import { expect } from 'chai';
33
import { IncomingMessage, ServerResponse } from 'http';
4-
import { Observable, of } from 'rxjs';
4+
import { Observable, of, Subject } from 'rxjs';
55
import * as sinon from 'sinon';
66
import { PassThrough, Writable } from 'stream';
77
import { HttpStatus, RequestMethod } from '../../../common';
@@ -349,14 +349,86 @@ data: test
349349
const request = new Writable();
350350
request._write = () => {};
351351

352-
routerResponseController.sse(
353-
result as unknown as Observable<string>,
354-
response as unknown as ServerResponse,
355-
request as unknown as IncomingMessage,
356-
);
352+
try {
353+
routerResponseController.sse(
354+
result as unknown as Observable<string>,
355+
response as unknown as ServerResponse,
356+
request as unknown as IncomingMessage,
357+
);
358+
} catch {
359+
// Wether an error is thrown or not
360+
// is not relevant, so long as
361+
// result is not called
362+
}
357363

358364
sinon.assert.notCalled(result);
359365
done();
360366
});
367+
368+
describe('when there is an error', () => {
369+
it('should close the request', done => {
370+
const result = new Subject();
371+
const response = new Writable();
372+
response.end = done;
373+
response._write = () => {};
374+
375+
const request = new Writable();
376+
request._write = () => {};
377+
378+
routerResponseController.sse(
379+
result,
380+
response as unknown as ServerResponse,
381+
request as unknown as IncomingMessage,
382+
);
383+
384+
result.error(new Error('Some error'));
385+
});
386+
387+
it('should write the error message to the stream', async () => {
388+
class Sink extends Writable {
389+
private readonly chunks: string[] = [];
390+
391+
_write(
392+
chunk: any,
393+
encoding: string,
394+
callback: (error?: Error | null) => void,
395+
): void {
396+
this.chunks.push(chunk);
397+
callback();
398+
}
399+
400+
get content() {
401+
return this.chunks.join('');
402+
}
403+
}
404+
405+
const written = (stream: Writable) =>
406+
new Promise((resolve, reject) =>
407+
stream.on('error', reject).on('finish', resolve),
408+
);
409+
410+
const result = new Subject();
411+
const response = new Sink();
412+
const request = new PassThrough();
413+
routerResponseController.sse(
414+
result,
415+
response as unknown as ServerResponse,
416+
request as unknown as IncomingMessage,
417+
);
418+
419+
result.error(new Error('Some error'));
420+
request.destroy();
421+
422+
await written(response);
423+
expect(response.content).to.eql(
424+
`:
425+
event: error
426+
id: 1
427+
data: Some error
428+
429+
`,
430+
);
431+
});
432+
});
361433
});
362434
});

0 commit comments

Comments
 (0)