Skip to content

Commit cbec376

Browse files
committed
feat: use isObervable from rxjs instead of our own version
Relying on `isObservable` type guard from RxJS is better as it has less false-positives.
1 parent 7825c02 commit cbec376

6 files changed

Lines changed: 15 additions & 52 deletions

File tree

packages/core/helpers/external-context-creator.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import {
66
PipeTransform,
77
} from '@nestjs/common/interfaces';
88
import { isEmpty, isFunction } from '@nestjs/common/utils/shared.utils';
9-
import { lastValueFrom } from 'rxjs';
9+
import { lastValueFrom, isObservable } from 'rxjs';
1010
import { ExternalExceptionFilterContext } from '../exceptions/external-exception-filter-context';
1111
import { FORBIDDEN_MESSAGE } from '../guards/constants';
1212
import { GuardsConsumer } from '../guards/guards-consumer';
@@ -329,7 +329,7 @@ export class ExternalContextCreator {
329329
}
330330

331331
public async transformToResult(resultOrDeferred: any) {
332-
if (resultOrDeferred && isFunction(resultOrDeferred.subscribe)) {
332+
if (isObservable(resultOrDeferred)) {
333333
return lastValueFrom(resultOrDeferred);
334334
}
335335
return resultOrDeferred;

packages/microservices/context/rpc-proxy.ts

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
import { isFunction } from '@nestjs/common/utils/shared.utils';
21
import { ExecutionContextHost } from '@nestjs/core/helpers/execution-context-host';
3-
import { Observable } from 'rxjs';
2+
import { Observable, isObservable } from 'rxjs';
43
import { catchError } from 'rxjs/operators';
54
import { RpcExceptionsHandler } from '../exceptions/rpc-exceptions-handler';
65

@@ -12,7 +11,7 @@ export class RpcProxy {
1211
return async (...args: unknown[]) => {
1312
try {
1413
const result = await targetCallback(...args);
15-
return !this.isObservable(result)
14+
return !isObservable(result)
1615
? result
1716
: result.pipe(
1817
catchError(error =>
@@ -34,8 +33,4 @@ export class RpcProxy {
3433
host.setType('rpc');
3534
return exceptionsHandler.handle(error, host);
3635
}
37-
38-
isObservable(result: any): boolean {
39-
return result && isFunction(result.subscribe);
40-
}
4136
}

packages/microservices/server/server.ts

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import { Logger, LoggerService } from '@nestjs/common/services/logger.service';
22
import { loadPackage } from '@nestjs/common/utils/load-package.util';
3-
import { isFunction } from '@nestjs/common/utils/shared.utils';
43
import {
54
connectable,
65
EMPTY as empty,
76
from as fromPromise,
7+
isObservable,
88
Observable,
99
of,
1010
Subject,
@@ -113,7 +113,7 @@ export abstract class Server {
113113
);
114114
}
115115
const resultOrStream = await handler(packet.data, context);
116-
if (this.isObservable(resultOrStream)) {
116+
if (isObservable(resultOrStream)) {
117117
const connectableSource = connectable(resultOrStream, {
118118
connector: () => new Subject(),
119119
resetOnDisconnect: false,
@@ -125,10 +125,13 @@ export abstract class Server {
125125
public transformToObservable<T = any>(resultOrDeferred: any): Observable<T> {
126126
if (resultOrDeferred instanceof Promise) {
127127
return fromPromise(resultOrDeferred);
128-
} else if (!this.isObservable(resultOrDeferred)) {
129-
return of(resultOrDeferred);
130128
}
131-
return resultOrDeferred;
129+
130+
if (isObservable(resultOrDeferred)) {
131+
return resultOrDeferred as Observable<T>;
132+
}
133+
134+
return of(resultOrDeferred);
132135
}
133136

134137
public getOptionsProp<
@@ -180,10 +183,6 @@ export abstract class Server {
180183
new IncomingRequestDeserializer();
181184
}
182185

183-
private isObservable(input: unknown): input is Observable<any> {
184-
return input && isFunction((input as Observable<any>).subscribe);
185-
}
186-
187186
/**
188187
* Transforms the server Pattern to valid type and returns a route for him.
189188
*

packages/microservices/test/context/rpc-proxy.spec.ts

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,17 +39,4 @@ describe('RpcProxy', () => {
3939
(await proxy(null, null)).subscribe(null, () => expectation.verify());
4040
});
4141
});
42-
43-
describe('isObservable', () => {
44-
describe('when observable', () => {
45-
it('should return true', () => {
46-
expect(routerProxy.isObservable(of('test'))).to.be.true;
47-
});
48-
});
49-
describe('when not observable', () => {
50-
it('should return false', () => {
51-
expect(routerProxy.isObservable({})).to.be.false;
52-
});
53-
});
54-
});
5542
});

packages/websockets/context/ws-proxy.ts

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
import { isFunction } from '@nestjs/common/utils/shared.utils';
21
import { ExecutionContextHost } from '@nestjs/core/helpers/execution-context-host';
3-
import { empty } from 'rxjs';
2+
import { empty, isObservable } from 'rxjs';
43
import { catchError } from 'rxjs/operators';
54
import { WsExceptionsHandler } from '../exceptions/ws-exceptions-handler';
65

@@ -12,7 +11,7 @@ export class WsProxy {
1211
return async (...args: unknown[]) => {
1312
try {
1413
const result = await targetCallback(...args);
15-
return !this.isObservable(result)
14+
return !isObservable(result)
1615
? result
1716
: result.pipe(
1817
catchError(error => {
@@ -35,8 +34,4 @@ export class WsProxy {
3534
host.setType('ws');
3635
exceptionsHandler.handle(error, host);
3736
}
38-
39-
isObservable(result: any): boolean {
40-
return result && isFunction(result.subscribe);
41-
}
4237
}

packages/websockets/test/context/ws-proxy.spec.ts

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { expect } from 'chai';
2-
import { of, throwError } from 'rxjs';
2+
import { throwError } from 'rxjs';
33
import * as sinon from 'sinon';
44
import { WsProxy } from '../../context/ws-proxy';
55
import { WsException } from '../../errors/ws-exception';
@@ -40,17 +40,4 @@ describe('WsProxy', () => {
4040
(await proxy(null, null)).subscribe(null, () => expectation.verify());
4141
});
4242
});
43-
44-
describe('isObservable', () => {
45-
describe('when observable', () => {
46-
it('should return true', () => {
47-
expect(routerProxy.isObservable(of('test'))).to.be.true;
48-
});
49-
});
50-
describe('when not observable', () => {
51-
it('should return false', () => {
52-
expect(routerProxy.isObservable({})).to.be.false;
53-
});
54-
});
55-
});
5643
});

0 commit comments

Comments
 (0)