Skip to content

Commit 4763fb9

Browse files
committed
feat(microservices): Removes unused exception and updates tests
1 parent 68ad994 commit 4763fb9

5 files changed

Lines changed: 33 additions & 26 deletions

File tree

packages/microservices/client/client-kafka.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import {
88
} from '../constants';
99
import { KafkaResponseDeserializer } from '../deserializers/kafka-response.deserializer';
1010
import { KafkaHeaders } from '../enums';
11-
import { InvalidKafkaClientTopicPartitionException } from '../errors/invalid-kafka-client-topic-partition.exception';
1211
import { InvalidKafkaClientTopicException } from '../errors/invalid-kafka-client-topic.exception';
1312
import {
1413
BrokersFunction,

packages/microservices/errors/invalid-kafka-client-topic-partition.exception.ts

Lines changed: 0 additions & 9 deletions
This file was deleted.

packages/microservices/helpers/kafka-reply-partition-assigner.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ export class KafkaReplyPartitionAssigner {
1818
constructor(
1919
private readonly clientKafka: ClientKafka,
2020
private readonly config: {
21-
groupId: string;
2221
cluster: Cluster;
2322
},
2423
) {

packages/microservices/test/client/client-kafka.spec.ts

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ import * as sinon from 'sinon';
33
import { ClientKafka } from '../../client/client-kafka';
44
import { NO_MESSAGE_HANDLER } from '../../constants';
55
import { KafkaHeaders } from '../../enums';
6-
import { InvalidKafkaClientTopicPartitionException } from '../../errors/invalid-kafka-client-topic-partition.exception';
76
import { InvalidKafkaClientTopicException } from '../../errors/invalid-kafka-client-topic.exception';
87
import {
98
ConsumerGroupJoinEvent,
@@ -269,6 +268,7 @@ describe('ClientKafka', () => {
269268

270269
expect(createClientStub.calledOnce).to.be.true;
271270
expect(producerStub.calledOnce).to.be.true;
271+
272272
expect(consumerStub.calledOnce).to.be.true;
273273

274274
expect(on.calledOnce).to.be.true;
@@ -314,13 +314,19 @@ describe('ClientKafka', () => {
314314
memberId: 'member-1',
315315
memberAssignment: {
316316
'topic-a': [0, 1, 2],
317+
'topic-b': [3, 4, 5],
317318
},
318319
},
319320
};
320321

321322
client['setConsumerAssignments'](consumerAssignments);
323+
322324
expect(client['consumerAssignments']).to.deep.eq(
323-
consumerAssignments.payload.memberAssignment,
325+
// consumerAssignments.payload.memberAssignment,
326+
{
327+
'topic-a': 0,
328+
'topic-b': 3,
329+
},
324330
);
325331
});
326332
});
@@ -493,30 +499,40 @@ describe('ClientKafka', () => {
493499
});
494500
});
495501

502+
describe('getConsumerAssignments', () => {
503+
it('should get consumer assignments', () => {
504+
client['consumerAssignments'] = {
505+
[replyTopic]: 0,
506+
};
507+
508+
const result = client.getConsumerAssignments();
509+
510+
expect(result).to.deep.eq(client['consumerAssignments']);
511+
});
512+
});
513+
496514
describe('getReplyTopicPartition', () => {
497515
it('should get reply partition', () => {
498516
client['consumerAssignments'] = {
499-
[replyTopic]: [0],
517+
[replyTopic]: 0,
500518
};
501519

502520
const result = client['getReplyTopicPartition'](replyTopic);
503521

504522
expect(result).to.eq('0');
505523
});
506524

507-
it('should throw error when the topic is being consumed but is not assigned partitions', () => {
508-
client['consumerAssignments'] = {
509-
[replyTopic]: [],
510-
};
525+
it('should throw error when the topic is not being consumed', () => {
526+
client['consumerAssignments'] = {};
511527

512528
expect(() => client['getReplyTopicPartition'](replyTopic)).to.throw(
513-
InvalidKafkaClientTopicPartitionException,
529+
InvalidKafkaClientTopicException,
514530
);
515531
});
516532

517-
it('should throw error when the topic is not being consumer', () => {
533+
it('should throw error when the topic is not being consumed', () => {
518534
client['consumerAssignments'] = {
519-
[topic]: [],
535+
[topic]: undefined,
520536
};
521537

522538
expect(() => client['getReplyTopicPartition'](replyTopic)).to.throw(
@@ -568,7 +584,7 @@ describe('ClientKafka', () => {
568584

569585
// set
570586
client['consumerAssignments'] = {
571-
[replyTopic]: [parseFloat(replyPartition)],
587+
[replyTopic]: parseFloat(replyPartition),
572588
};
573589
});
574590

packages/microservices/test/helpers/kafka-round-robin-by-time-partition-assigner.spec.ts renamed to packages/microservices/test/helpers/kafka-reply-partition-assigner.spec.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
import { expect } from 'chai';
22
import * as Kafka from 'kafkajs';
3-
import { KafkaRoundRobinPartitionAssigner } from '../../helpers/kafka-round-robin-partition-assigner';
3+
import { KafkaReplyPartitionAssigner } from '../../helpers/kafka-reply-partition-assigner';
4+
import { ClientKafka } from '../../client/client-kafka';
45

5-
describe('kafka round robin by time', () => {
6-
let cluster, topics, metadata, assigner;
6+
describe('kafka reply partition assigner', () => {
7+
let cluster, topics, metadata, assigner, client;
78

89
beforeEach(() => {
910
metadata = {};
1011
cluster = { findTopicPartitionMetadata: topic => metadata[topic] };
11-
assigner = new KafkaRoundRobinPartitionAssigner({ cluster });
12+
client = new ClientKafka({});
13+
assigner = new KafkaReplyPartitionAssigner(client, { cluster });
1214
topics = ['topic-A', 'topic-B'];
1315
});
1416

0 commit comments

Comments
 (0)