Skip to content

Commit 816bc91

Browse files
Merge pull request nestjs#9954 from kosh-b/feature/kafka-heartbeat-passed-to-endpoint
feat(microservices): add Kafka heartbeat callback to KafkaContext
2 parents df4d99d + 380d230 commit 816bc91

6 files changed

Lines changed: 42 additions & 2 deletions

File tree

packages/microservices/ctx-host/kafka.context.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ type KafkaContextArgs = [
66
partition: number,
77
topic: string,
88
consumer: Consumer,
9+
heartbeat: () => Promise<void>,
910
];
1011

1112
export class KafkaContext extends BaseRpcContext<KafkaContextArgs> {
@@ -40,4 +41,11 @@ export class KafkaContext extends BaseRpcContext<KafkaContextArgs> {
4041
getConsumer() {
4142
return this.args[3];
4243
}
44+
45+
/**
46+
* Returns the Kafka heartbeat callback.
47+
*/
48+
getHeartbeat() {
49+
return this.args[4];
50+
}
4351
}

packages/microservices/external/kafka.interface.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -870,6 +870,8 @@ export interface EachMessagePayload {
870870
topic: string;
871871
partition: number;
872872
message: KafkaMessage;
873+
heartbeat(): Promise<void>;
874+
pause(): () => void;
873875
}
874876

875877
export interface EachBatchPayload {

packages/microservices/server/server-kafka.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
167167
payload.partition,
168168
payload.topic,
169169
this.consumer,
170+
payload.heartbeat,
170171
]);
171172
const handler = this.getHandlerByPattern(packet.pattern);
172173
// if the correlation id or reply topic is not set

packages/microservices/test/client/client-kafka.spec.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ describe('ClientKafka', () => {
2222
const timestamp = new Date().toISOString();
2323
const attributes = 1;
2424
const messageValue = 'test-message';
25+
const heartbeat = async () => {};
26+
const pause = () => () => {};
2527

2628
// message
2729
const message: KafkaMessage = {
@@ -57,6 +59,8 @@ describe('ClientKafka', () => {
5759
},
5860
message,
5961
),
62+
heartbeat,
63+
pause,
6064
};
6165

6266
const payloadDisposed: EachMessagePayload = {
@@ -75,6 +79,8 @@ describe('ClientKafka', () => {
7579
value: { test: true },
7680
},
7781
),
82+
heartbeat,
83+
pause,
7884
};
7985

8086
const payloadError: EachMessagePayload = {
@@ -93,6 +99,8 @@ describe('ClientKafka', () => {
9399
value: null,
94100
},
95101
),
102+
heartbeat,
103+
pause,
96104
};
97105

98106
const payloadWithoutCorrelation: EachMessagePayload = {
@@ -104,6 +112,8 @@ describe('ClientKafka', () => {
104112
},
105113
message,
106114
),
115+
heartbeat,
116+
pause,
107117
};
108118

109119
// deserialized payload
@@ -118,6 +128,8 @@ describe('ClientKafka', () => {
118128
},
119129
deserializedMessage,
120130
),
131+
heartbeat,
132+
pause,
121133
};
122134

123135
const deserializedPayloadDisposed: EachMessagePayload = {
@@ -136,6 +148,8 @@ describe('ClientKafka', () => {
136148
value: { test: true },
137149
},
138150
),
151+
heartbeat,
152+
pause,
139153
};
140154

141155
const deserializedPayloadError: EachMessagePayload = {
@@ -154,6 +168,8 @@ describe('ClientKafka', () => {
154168
value: null,
155169
},
156170
),
171+
heartbeat,
172+
pause,
157173
};
158174

159175
let client: ClientKafka;

packages/microservices/test/ctx-host/kafka.context.spec.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@ import { KafkaContext } from '../../ctx-host';
33
import { Consumer, KafkaMessage } from '../../external/kafka.interface';
44

55
describe('KafkaContext', () => {
6-
const args = ['test', { test: true }, undefined, { test: 'consumer' }];
6+
const args = ['test', { test: true }, undefined, { test: 'consumer' }, () => {}];
77
let context: KafkaContext;
88

99
beforeEach(() => {
1010
context = new KafkaContext(
11-
args as [KafkaMessage, number, string, Consumer],
11+
args as [KafkaMessage, number, string, Consumer, () => Promise<void>],
1212
);
1313
});
1414
describe('getTopic', () => {
@@ -31,4 +31,9 @@ describe('KafkaContext', () => {
3131
expect(context.getConsumer()).to.deep.eq({ test: 'consumer' });
3232
});
3333
});
34+
describe('getHeartbeat', () => {
35+
it('should return heartbeat callback', () => {
36+
expect(context.getHeartbeat()).to.be.eql(args[4]);
37+
});
38+
});
3439
});

packages/microservices/test/server/server-kafka.spec.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ describe('ServerKafka', () => {
2626
const key = '1';
2727
const timestamp = new Date().toISOString();
2828
const messageValue = 'test-message';
29+
const heartbeat = async () => {};
30+
const pause = () => () => {};
2931

3032
const eventMessage: KafkaMessage = {
3133
key: Buffer.from(key),
@@ -44,6 +46,8 @@ describe('ServerKafka', () => {
4446
},
4547
eventMessage,
4648
),
49+
heartbeat,
50+
pause,
4751
};
4852

4953
const eventWithCorrelationIdPayload: EachMessagePayload = {
@@ -57,6 +61,8 @@ describe('ServerKafka', () => {
5761
},
5862
eventMessage,
5963
),
64+
heartbeat,
65+
pause,
6066
};
6167

6268
const message: KafkaMessage = Object.assign(
@@ -73,6 +79,8 @@ describe('ServerKafka', () => {
7379
topic,
7480
partition: 0,
7581
message,
82+
heartbeat,
83+
pause,
7684
};
7785

7886
let server: ServerKafka;

0 commit comments

Comments
 (0)