Skip to content

Commit e0e1feb

Browse files
committed
test(sse): Test sse
1 parent 98c3c98 commit e0e1feb

14 files changed

Lines changed: 7749 additions & 11896 deletions

package-lock.json

Lines changed: 7411 additions & 11842 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@
116116
"eslint": "7.10.0",
117117
"eslint-config-prettier": "6.11.0",
118118
"eslint-plugin-import": "2.22.1",
119+
"eventsource": "^1.0.7",
119120
"fancy-log": "1.3.3",
120121
"fastify": "3.5.1",
121122
"fastify-cors": "4.1.0",

packages/common/http/index.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
11
export * from './http.module';
22
export * from './http.service';
33
export * from './interfaces';
4-
export { SseStream } from './sse-stream.service';
Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
import { expect } from 'chai';
22
import { Sse } from '../../decorators/http/sse.decorator';
3-
import { HTTP_CODE_METADATA, SSE_METADATA } from '../../constants';
3+
import {
4+
HTTP_CODE_METADATA,
5+
SSE_METADATA,
6+
PATH_METADATA,
7+
METHOD_METADATA,
8+
} from '../../constants';
9+
import { RequestMethod } from '../../enums/request-method.enum';
410

511
describe('@Sse', () => {
612
const prefix = '/prefix';
@@ -10,7 +16,11 @@ describe('@Sse', () => {
1016
}
1117

1218
it('should enhance method with expected http status code', () => {
19+
const path = Reflect.getMetadata(PATH_METADATA, Test.test);
20+
expect(path).to.be.eql('/prefix');
21+
const method = Reflect.getMetadata(METHOD_METADATA, Test.test);
22+
expect(method).to.be.eql(RequestMethod.GET);
1323
const metadata = Reflect.getMetadata(SSE_METADATA, Test.test);
14-
expect(metadata).to.be.eql(prefix);
24+
expect(metadata).to.be.eql(true);
1525
});
1626
});

packages/core/helpers/handler-metadata-storage.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@ import { ParamProperties } from './context-utils';
66

77
export const HANDLER_METADATA_SYMBOL = Symbol.for('handler_metadata:cache');
88

9+
export type HandleResponseFn = <TResult, TResponse, TRequest>(
10+
result: TResult,
11+
res: TResponse,
12+
req?: TRequest,
13+
) => any;
14+
915
export interface HandlerMetadata {
1016
argsLength: number;
1117
paramtypes: any[];
@@ -17,11 +23,7 @@ export interface HandlerMetadata {
1723
contextId?: ContextId,
1824
inquirerId?: string,
1925
) => (ParamProperties & { metatype?: any })[];
20-
fnHandleResponse: <TResult, TResponse, TRequest>(
21-
result: TResult,
22-
res: TResponse,
23-
req?: TRequest,
24-
) => any;
26+
fnHandleResponse: HandleResponseFn;
2527
}
2628

2729
export class HandlerMetadataStorage<

packages/core/router/router-execution-context.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import { ExecutionContextHost } from '../helpers/execution-context-host';
2727
import {
2828
HandlerMetadata,
2929
HandlerMetadataStorage,
30+
HandleResponseFn,
3031
} from '../helpers/handler-metadata-storage';
3132
import { STATIC_CONTEXT } from '../injector/constants';
3233
import { InterceptorsConsumer } from '../interceptors/interceptors-consumer';
@@ -403,7 +404,7 @@ export class RouterExecutionContext {
403404
isResponseHandled: boolean,
404405
redirectResponse?: RedirectResponse,
405406
httpStatusCode?: number,
406-
) {
407+
): HandleResponseFn {
407408
const renderTemplate = this.reflectRenderTemplate(callback);
408409
if (renderTemplate) {
409410
return async <TResult, TResponse>(result: TResult, res: TResponse) => {
@@ -420,7 +421,7 @@ export class RouterExecutionContext {
420421
return async <TResult, TResponse, TRequest>(
421422
result: TResult,
422423
res: TResponse,
423-
req: TRequest,
424+
req?: TRequest,
424425
) => {
425426
await this.responseController.sse(result, res, req);
426427
};

packages/core/router/router-response-controller.ts

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@ import {
22
HttpServer,
33
HttpStatus,
44
RequestMethod,
5-
SseStream,
65
MessageEvent,
76
} from '@nestjs/common';
87
import { isFunction } from '@nestjs/common/utils/shared.utils';
98
import { Observable } from 'rxjs';
9+
import { SseStream, HeaderStream } from '../services';
10+
import { IncomingMessage, ServerResponse } from 'http';
1011

1112
export interface CustomHeader {
1213
name: string;
@@ -86,21 +87,14 @@ export class RouterResponseController {
8687
this.applicationRef.status(response, statusCode);
8788
}
8889

89-
public async sse<TInput = unknown, TResponse = unknown, TRequest = unknown>(
90-
result: any,
91-
response: any,
92-
request: any,
93-
) {
94-
if (!isFunction(result.subscribe)) {
95-
throw new ReferenceError(
96-
'You should use an observable to use server-sent events.',
97-
);
98-
}
90+
public async sse<TInput>(result: TInput, response: any, request: any) {
91+
const observable = this.assertObservable(result);
9992

10093
const stream = new SseStream(request);
10194
stream.pipe(response);
10295

103-
const subscription = result.subscribe((message: MessageEvent) => {
96+
const subscription = observable.subscribe((message: any) => {
97+
if (typeof message !== 'object') message = { data: message };
10498
stream.writeMessage(message);
10599
});
106100

@@ -109,4 +103,14 @@ export class RouterResponseController {
109103
subscription.unsubscribe();
110104
});
111105
}
106+
107+
private assertObservable(result: any): Observable<unknown> {
108+
if (!isFunction(result.subscribe)) {
109+
throw new ReferenceError(
110+
'You should use an observable to use server-sent events.',
111+
);
112+
}
113+
114+
return result;
115+
}
112116
}

packages/core/services/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
export * from './reflector.service';
2+
export * from './sse-stream.service';

packages/common/http/sse-stream.service.ts renamed to packages/core/services/sse-stream.service.ts

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { Transform } from 'stream';
22
import { IncomingMessage, OutgoingHttpHeaders } from 'http';
3-
import { MessageEvent } from '../interfaces';
3+
import { MessageEvent } from '@nestjs/common/interfaces';
44

55
function toDataString(data: string | object): string {
66
if (typeof data === 'object') return toDataString(JSON.stringify(data));
@@ -11,7 +11,12 @@ function toDataString(data: string | object): string {
1111
}
1212

1313
interface WriteHeaders {
14-
writeHead?(statusCode: number, headers?: OutgoingHttpHeaders): WriteHeaders;
14+
writeHead?(
15+
statusCode: number,
16+
reasonPhrase?: string,
17+
headers?: OutgoingHttpHeaders,
18+
): void;
19+
writeHead?(statusCode: number, headers?: OutgoingHttpHeaders): void;
1520
flushHeaders?(): void;
1621
}
1722

@@ -35,7 +40,7 @@ export class SseStream extends Transform {
3540

3641
constructor(req?: IncomingMessage) {
3742
super({ objectMode: true });
38-
if (req) {
43+
if (req && req.socket) {
3944
req.socket.setKeepAlive(true);
4045
req.socket.setNoDelay(true);
4146
req.socket.setTimeout(0);
@@ -45,15 +50,22 @@ export class SseStream extends Transform {
4550
pipe<T extends HeaderStream>(destination: T, options?: { end?: boolean }): T {
4651
if (destination.writeHead) {
4752
destination.writeHead(200, {
48-
'Content-Type': 'text/event-stream; charset=utf-8',
49-
'Transfer-Encoding': 'identity',
50-
'Cache-Control': 'no-cache',
53+
// See https://github.com/dunglas/mercure/blob/master/hub/subscribe.go#L124-L130
54+
'Content-Type': 'text/event-stream',
5155
Connection: 'keep-alive',
56+
// Disable cache, even for old browsers and proxies
57+
'Cache-Control':
58+
'private, no-cache, no-store, must-revalidate, max-age=0',
59+
'Transfer-Encoding': 'identity',
60+
Pragma: 'no-cache',
61+
Expire: '0',
62+
// NGINX support https://www.nginx.com/resources/wiki/start/topics/examples/x-accel/#x-accel-buffering
63+
'X-Accel-Buffering': 'no',
5264
});
5365
destination.flushHeaders();
5466
}
5567

56-
destination.write(':ok\n');
68+
destination.write(':\n');
5769
return super.pipe(destination, options);
5870
}
5971

@@ -76,12 +88,8 @@ export class SseStream extends Transform {
7688
cb?: (error: Error | null | undefined) => void,
7789
): boolean {
7890
if (!message.id) {
79-
this.lastEventId = this.lastEventId === null ? 0 : this.lastEventId + 1;
80-
message.id = '' + this.lastEventId;
81-
}
82-
83-
if (!message.type) {
84-
message.type = 'message';
91+
this.lastEventId++;
92+
message.id = this.lastEventId.toString();
8593
}
8694

8795
return this.write(message, encoding, cb);

packages/core/test/router/router-execution-context.spec.ts

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { ForbiddenException } from '@nestjs/common/exceptions/forbidden.exception';
22
import { expect } from 'chai';
3+
import { of } from 'rxjs';
34
import * as sinon from 'sinon';
45
import { HttpException, HttpStatus, RouteParamMetadata } from '../../../common';
56
import { CUSTOM_ROUTE_AGRS_METADATA } from '../../../common/constants';
@@ -17,6 +18,7 @@ import { PipesContextCreator } from '../../pipes/pipes-context-creator';
1718
import { RouteParamsFactory } from '../../router/route-params-factory';
1819
import { RouterExecutionContext } from '../../router/router-execution-context';
1920
import { NoopHttpAdapter } from '../utils/noop-adapter.spec';
21+
import { PassThrough } from 'stream';
2022

2123
describe('RouterExecutionContext', () => {
2224
let contextCreator: RouterExecutionContext;
@@ -326,6 +328,7 @@ describe('RouterExecutionContext', () => {
326328

327329
sinon.stub(contextCreator, 'reflectResponseHeaders').returns([]);
328330
sinon.stub(contextCreator, 'reflectRenderTemplate').returns(undefined);
331+
sinon.stub(contextCreator, 'reflectSse').returns(undefined);
329332

330333
const handler = contextCreator.createHandleResponseFn(
331334
null,
@@ -377,6 +380,7 @@ describe('RouterExecutionContext', () => {
377380

378381
sinon.stub(contextCreator, 'reflectResponseHeaders').returns([]);
379382
sinon.stub(contextCreator, 'reflectRenderTemplate').returns(undefined);
383+
sinon.stub(contextCreator, 'reflectSse').returns(undefined);
380384

381385
const handler = contextCreator.createHandleResponseFn(
382386
null,
@@ -396,6 +400,7 @@ describe('RouterExecutionContext', () => {
396400
const response = {};
397401

398402
sinon.stub(contextCreator, 'reflectRenderTemplate').returns(undefined);
403+
sinon.stub(contextCreator, 'reflectSse').returns(undefined);
399404

400405
const handler = contextCreator.createHandleResponseFn(
401406
null,
@@ -414,5 +419,53 @@ describe('RouterExecutionContext', () => {
414419
).to.be.true;
415420
});
416421
});
422+
423+
describe('when "isSse" is enabled', () => {
424+
it('should use sse-stream.service', async () => {
425+
const result = of('test');
426+
const response = new PassThrough();
427+
response.write = sinon.spy();
428+
const request = new PassThrough();
429+
request.on = sinon.spy();
430+
431+
sinon.stub(contextCreator, 'reflectRenderTemplate').returns(undefined);
432+
sinon.stub(contextCreator, 'reflectSse').returns('/');
433+
434+
const handler = contextCreator.createHandleResponseFn(
435+
null,
436+
true,
437+
undefined,
438+
200,
439+
);
440+
await handler(result, response, request);
441+
442+
expect((response.write as any).called).to.be.true;
443+
expect((request.on as any).called).to.be.true;
444+
});
445+
446+
it('should not allow a non-observable result', async () => {
447+
const result = Promise.resolve('test');
448+
const response = new PassThrough();
449+
const request = new PassThrough();
450+
451+
sinon.stub(contextCreator, 'reflectRenderTemplate').returns(undefined);
452+
sinon.stub(contextCreator, 'reflectSse').returns('/');
453+
454+
const handler = contextCreator.createHandleResponseFn(
455+
null,
456+
true,
457+
undefined,
458+
200,
459+
);
460+
461+
try {
462+
await handler(result, response, request);
463+
} catch (e) {
464+
expect(e.message).to.equal(
465+
'You should use an observable to use server-sent events.',
466+
);
467+
}
468+
});
469+
});
417470
});
418471
});

0 commit comments

Comments
 (0)