Skip to content

Commit a439055

Browse files
feat(microservices): add kafka retriable exception, rethrow if needed
1 parent 37d83ee commit a439055

9 files changed

Lines changed: 49 additions & 79 deletions

File tree

packages/microservices/context/kafka-rpc-proxy.ts

Lines changed: 0 additions & 12 deletions
This file was deleted.

packages/microservices/context/rpc-proxy.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { ExecutionContextHost } from '@nestjs/core/helpers/execution-context-host';
2-
import { Observable, isObservable } from 'rxjs';
2+
import { isObservable, Observable } from 'rxjs';
33
import { catchError } from 'rxjs/operators';
44
import { RpcExceptionsHandler } from '../exceptions/rpc-exceptions-handler';
55

packages/microservices/ctx-host/kafka.context.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ export class KafkaContext extends BaseRpcContext<KafkaContextArgs> {
3535
}
3636

3737
/**
38-
* Commit the offset of this message.
38+
* Returns the Kafka consumer reference.
3939
*/
4040
getConsumer() {
4141
return this.args[3];
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
export * from './base-rpc-exception-filter';
2+
export * from './kafka-retriable-exception';
23
export * from './rpc-exception';
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import { RpcException } from './rpc-exception';
2+
3+
/**
4+
* Exception that instructs Kafka driver to instead of introspecting
5+
* error processing flow and sending serialized error message to the consumer,
6+
* force bubble it up to the "eachMessage" callback of the underlying "kafkajs" package
7+
* (even if interceptors are applied, or an observable stream is returned from the message handler).
8+
*
9+
* A transient exception that if retried may succeed.
10+
*
11+
* @publicApi
12+
*/
13+
export class KafkaRetriableException extends RpcException {
14+
public getError(): string | object {
15+
return this;
16+
}
17+
}

packages/microservices/microservices-module.ts

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,8 @@ import { PipesContextCreator } from '@nestjs/core/pipes/pipes-context-creator';
1313
import { ClientProxyFactory } from './client';
1414
import { ClientsContainer } from './container';
1515
import { ExceptionFiltersContext } from './context/exception-filters-context';
16-
import { KafkaRpcProxy } from './context/kafka-rpc-proxy';
1716
import { RpcContextCreator } from './context/rpc-context-creator';
1817
import { RpcProxy } from './context/rpc-proxy';
19-
import { Transport } from './enums';
2018
import { CustomTransportStrategy } from './interfaces';
2119
import { ListenersController } from './listeners-controller';
2220
import { Server } from './server/server';
@@ -25,19 +23,13 @@ export class MicroservicesModule {
2523
private readonly clientsContainer = new ClientsContainer();
2624
private listenersController: ListenersController;
2725

28-
public register(
29-
container: NestContainer,
30-
config: ApplicationConfig,
31-
transport?: Transport,
32-
) {
33-
const rpcProxy =
34-
transport === Transport.KAFKA ? new KafkaRpcProxy() : new RpcProxy();
26+
public register(container: NestContainer, config: ApplicationConfig) {
3527
const exceptionFiltersContext = new ExceptionFiltersContext(
3628
container,
3729
config,
3830
);
3931
const contextCreator = new RpcContextCreator(
40-
rpcProxy,
32+
new RpcProxy(),
4133
exceptionFiltersContext,
4234
new PipesContextCreator(container, config),
4335
new PipesConsumer(),

packages/microservices/nest-microservice.ts

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,7 @@ export class NestMicroservice
4646
) {
4747
super(container);
4848

49-
this.microservicesModule.register(
50-
container,
51-
this.applicationConfig,
52-
// @ts-expect-error transport does not exist on type of config??
53-
config.transport,
54-
);
49+
this.microservicesModule.register(container, this.applicationConfig);
5550
this.createServer(config);
5651
this.selectContextModule();
5752
}

packages/microservices/server/server-kafka.ts

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { Logger } from '@nestjs/common/services/logger.service';
22
import { isNil } from '@nestjs/common/utils/shared.utils';
3+
import { concat, firstValueFrom, of, skip, throwError } from 'rxjs';
34
import {
45
KAFKA_DEFAULT_BROKER,
56
KAFKA_DEFAULT_CLIENT,
@@ -8,6 +9,7 @@ import {
89
} from '../constants';
910
import { KafkaContext } from '../ctx-host';
1011
import { KafkaHeaders, Transport } from '../enums';
12+
import { KafkaRetriableException } from '../exceptions';
1113
import {
1214
BrokersFunction,
1315
Consumer,
@@ -182,10 +184,27 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
182184
});
183185
}
184186

187+
// @TODO pass packet.data.value (overide deserializer)
185188
const response$ = this.transformToObservable(
186189
await handler(packet.data, kafkaContext),
187190
);
188-
response$ && this.send(response$, publish);
191+
192+
const [isError, valueOrError] = await firstValueFrom(response$)
193+
.then(value => [false, value])
194+
.catch(error => {
195+
if (error instanceof KafkaRetriableException) {
196+
throw error;
197+
}
198+
return [true, error];
199+
});
200+
201+
this.send(
202+
concat(
203+
isError ? throwError(() => valueOrError) : of(valueOrError),
204+
response$.pipe(skip(1)),
205+
),
206+
publish,
207+
);
189208
}
190209

191210
public async sendMessage(
@@ -227,9 +246,12 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
227246
if (!outgoingResponse.err) {
228247
return;
229248
}
230-
outgoingMessage.headers[KafkaHeaders.NEST_ERR] = Buffer.from(
231-
outgoingResponse.err,
232-
);
249+
const stringifiedError =
250+
typeof outgoingResponse.err === 'object'
251+
? JSON.stringify(outgoingResponse.err)
252+
: outgoingResponse.err;
253+
outgoingMessage.headers[KafkaHeaders.NEST_ERR] =
254+
Buffer.from(stringifiedError);
233255
}
234256

235257
public assignCorrelationIdHeader(

packages/microservices/test/context/kafka-rpc-proxy.spec.ts

Lines changed: 0 additions & 45 deletions
This file was deleted.

0 commit comments

Comments
 (0)