Skip to content

Commit f83e830

Browse files
committed
fix(microservices): kafka parser stops modifying received message
1 parent b98e4d2 commit f83e830

3 files changed

Lines changed: 43 additions & 7 deletions

File tree

packages/microservices/helpers/kafka-parser.ts

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,22 +9,28 @@ export class KafkaParser {
99
}
1010

1111
public parse<T = any>(data: any): T {
12+
// Duplicate the object to not modify the original one (would break KafkaJS retries)
13+
const result = {
14+
...data,
15+
headers: { ...data.headers },
16+
};
17+
1218
if (!this.keepBinary) {
13-
data.value = this.decode(data.value);
19+
result.value = this.decode(data.value);
1420
}
1521

1622
if (!isNil(data.key)) {
17-
data.key = this.decode(data.key);
23+
result.key = this.decode(data.key);
1824
}
1925
if (!isNil(data.headers)) {
2026
const decodeHeaderByKey = (key: string) => {
21-
data.headers[key] = this.decode(data.headers[key]);
27+
result.headers[key] = this.decode(data.headers[key]);
2228
};
2329
Object.keys(data.headers).forEach(decodeHeaderByKey);
2430
} else {
25-
data.headers = {};
31+
result.headers = {};
2632
}
27-
return data;
33+
return result;
2834
}
2935

3036
public decode(value: Buffer): object | string | null | Buffer {

packages/microservices/test/client/client-kafka.spec.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ describe('ClientKafka', () => {
7676
message,
7777
{
7878
size: 0,
79-
value: { test: true },
79+
value: Buffer.from(JSON.stringify({ test: true })),
8080
},
8181
),
8282
heartbeat,
@@ -488,7 +488,7 @@ describe('ClientKafka', () => {
488488
expect(
489489
callback.calledWith({
490490
isDisposed: true,
491-
response: payloadDisposed.message.value,
491+
response: deserializedPayloadDisposed.message.value,
492492
err: undefined,
493493
}),
494494
).to.be.true;

packages/microservices/test/helpers/kafka-parser.spec.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,5 +126,35 @@ describe('KafkaParser', () => {
126126
},
127127
});
128128
});
129+
130+
it('parse message multiple times (simulate retry)', () => {
131+
const message = {
132+
headers: {
133+
[KafkaHeaders.CORRELATION_ID]: Buffer.from('correlation-id'),
134+
},
135+
value: Buffer.from(JSON.stringify({ prop: 'value' })),
136+
key: Buffer.from('1'),
137+
};
138+
const expectedParsedMessage = {
139+
key: '1',
140+
value: {
141+
prop: 'value',
142+
},
143+
headers: {
144+
[KafkaHeaders.CORRELATION_ID]: 'correlation-id',
145+
},
146+
};
147+
expect(kafkaParser.parse(message)).to.deep.eq(expectedParsedMessage);
148+
// Parse message again and verify it still works correctly
149+
expect(kafkaParser.parse(message)).to.deep.eq(expectedParsedMessage);
150+
// Verify message was not modified
151+
expect(message).to.deep.eq({
152+
headers: {
153+
[KafkaHeaders.CORRELATION_ID]: Buffer.from('correlation-id'),
154+
},
155+
value: Buffer.from(JSON.stringify({ prop: 'value' })),
156+
key: Buffer.from('1'),
157+
});
158+
});
129159
});
130160
});

0 commit comments

Comments
 (0)