Skip to content

Commit 6547f92

Browse files
feat(microservices): add kafka request deserializer, unwrap request
1 parent a439055 commit 6547f92

2 files changed

Lines changed: 26 additions & 1 deletion

File tree

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import { IncomingEvent, IncomingRequest } from '../interfaces';
2+
import { KafkaRequest } from '../serializers/kafka-request.serializer';
3+
import { IncomingRequestDeserializer } from './incoming-request.deserializer';
4+
5+
export class KafkaRequestDeserializer extends IncomingRequestDeserializer {
6+
mapToSchema(
7+
data: KafkaRequest,
8+
options?: Record<string, any>,
9+
): IncomingRequest | IncomingEvent {
10+
if (!options) {
11+
return {
12+
pattern: undefined,
13+
data: undefined,
14+
};
15+
}
16+
return {
17+
pattern: options.channel,
18+
data: data?.value ?? data,
19+
};
20+
}
21+
}

packages/microservices/server/server-kafka.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
NO_MESSAGE_HANDLER,
99
} from '../constants';
1010
import { KafkaContext } from '../ctx-host';
11+
import { KafkaRequestDeserializer } from '../deserializers/kafka-request.deserializer';
1112
import { KafkaHeaders, Transport } from '../enums';
1213
import { KafkaRetriableException } from '../exceptions';
1314
import {
@@ -184,7 +185,6 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
184185
});
185186
}
186187

187-
// @TODO pass packet.data.value (overide deserializer)
188188
const response$ = this.transformToObservable(
189189
await handler(packet.data, kafkaContext),
190190
);
@@ -276,4 +276,8 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
276276
this.serializer =
277277
(options && options.serializer) || new KafkaRequestSerializer();
278278
}
279+
280+
protected initializeDeserializer(options: KafkaOptions['options']) {
281+
this.deserializer = options?.deserializer ?? new KafkaRequestDeserializer();
282+
}
279283
}

0 commit comments

Comments
 (0)