Skip to content

Commit cd3a9d6

Browse files
fix(microservices): fix amqp v4 incompatibility issue
1 parent 30b75a5 commit cd3a9d6

3 files changed

Lines changed: 8 additions & 2 deletions

File tree

packages/microservices/client/client-rmq.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import { Logger } from '@nestjs/common/services/logger.service';
22
import { loadPackage } from '@nestjs/common/utils/load-package.util';
33
import { randomStringGenerator } from '@nestjs/common/utils/random-string-generator.util';
4+
import { isUndefined } from '@nestjs/common/utils/shared.utils';
45
import { EventEmitter } from 'events';
56
import { EmptyError, fromEvent, lastValueFrom, merge, Observable } from 'rxjs';
67
import { first, map, share, switchMap } from 'rxjs/operators';
78
import {
9+
CONNECT_FAILED_EVENT,
810
DISCONNECTED_RMQ_MESSAGE,
911
DISCONNECT_EVENT,
1012
ERROR_EVENT,
@@ -110,7 +112,11 @@ export class ClientRMQ extends ClientProxy {
110112
instance: any,
111113
source$: Observable<T>,
112114
): Observable<T> {
113-
const close$ = fromEvent(instance, DISCONNECT_EVENT).pipe(
115+
const connectFailedEvent = isUndefined(instance.connectionAttempts)
116+
? DISCONNECT_EVENT
117+
: CONNECT_FAILED_EVENT;
118+
119+
const close$ = fromEvent(instance, connectFailedEvent).pipe(
114120
map((err: any) => {
115121
throw err;
116122
}),

packages/microservices/constants.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ export const KAFKA_DEFAULT_BROKER = 'localhost:9092';
1111

1212
export const CONNECT_EVENT = 'connect';
1313
export const DISCONNECT_EVENT = 'disconnect';
14+
export const CONNECT_FAILED_EVENT = 'connectFailed';
1415
export const MESSAGE_EVENT = 'message';
1516
export const DATA_EVENT = 'data';
1617
export const ERROR_EVENT = 'error';

packages/microservices/server/server-rmq.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ import {
33
isString,
44
isUndefined,
55
} from '@nestjs/common/utils/shared.utils';
6-
import { Observable } from 'rxjs';
76
import {
87
CONNECT_EVENT,
98
DISCONNECTED_RMQ_MESSAGE,

0 commit comments

Comments
 (0)