Skip to content

Commit fb3db3f

Browse files
chore(): minor tweaks, align to the rest of the codebase
2 parents 8c503d3 + 35add88 commit fb3db3f

22 files changed

Lines changed: 1531 additions & 1404 deletions

integration/docker-compose.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,4 @@ services:
7070
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
7171
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
7272
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
73+
KAFKA_DELETE_TOPIC_ENABLE: 'true'
Lines changed: 292 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,292 @@
1+
import { INestApplication, Logger } from '@nestjs/common';
2+
import { Transport } from '@nestjs/microservices';
3+
import { Test } from '@nestjs/testing';
4+
import { Admin, ITopicMetadata, Kafka } from 'kafkajs';
5+
import * as request from 'supertest';
6+
import * as util from 'util';
7+
import { KafkaConcurrentController } from '../src/kafka-concurrent/kafka-concurrent.controller';
8+
import { KafkaConcurrentMessagesController } from '../src/kafka-concurrent/kafka-concurrent.messages.controller';
9+
import { Test } from '@nestjs/testing';
10+
11+
describe('Kafka concurrent', function () {
12+
const numbersOfServers = 3;
13+
14+
const requestTopic = 'math.sum.sync.number.wait';
15+
const responseTopic = 'math.sum.sync.number.wait.reply';
16+
17+
let admin: Admin;
18+
const servers: any[] = [];
19+
const apps: INestApplication[] = [];
20+
21+
const logger = new Logger('concurrent-kafka.spec.ts');
22+
23+
// set timeout to be longer (especially for the after hook)
24+
this.timeout(30000);
25+
26+
const startServer = async () => {
27+
const module = await Test.createTestingModule({
28+
controllers: [
29+
KafkaConcurrentController,
30+
KafkaConcurrentMessagesController,
31+
],
32+
}).compile();
33+
34+
// use our own logger for a little
35+
// Logger.overrideLogger(new Logger());
36+
37+
const app = module.createNestApplication();
38+
39+
const server = app.getHttpAdapter().getInstance();
40+
41+
app.connectMicroservice({
42+
transport: Transport.KAFKA,
43+
options: {
44+
client: {
45+
brokers: ['localhost:9092'],
46+
},
47+
run: {
48+
partitionsConsumedConcurrently: numbersOfServers,
49+
},
50+
},
51+
});
52+
53+
// enable these for clean shutdown
54+
app.enableShutdownHooks();
55+
56+
// push to the collection
57+
servers.push(server);
58+
apps.push(app);
59+
60+
// await the start
61+
await app.startAllMicroservicesAsync();
62+
await app.init();
63+
};
64+
65+
it(`Create kafka topics/partitions`, async () => {
66+
const kafka = new Kafka({
67+
clientId: 'concurrent-test-admin',
68+
brokers: ['localhost:9092'],
69+
});
70+
71+
admin = kafka.admin();
72+
await admin.connect();
73+
74+
let topicMetadata: {
75+
topics: ITopicMetadata[];
76+
};
77+
78+
try {
79+
topicMetadata = await admin.fetchTopicMetadata({
80+
topics: [requestTopic, responseTopic],
81+
});
82+
} catch (e) {
83+
// create with number of servers
84+
try {
85+
await admin.createTopics({
86+
topics: [
87+
{
88+
topic: requestTopic,
89+
numPartitions: numbersOfServers,
90+
replicationFactor: 1,
91+
},
92+
{
93+
topic: responseTopic,
94+
numPartitions: numbersOfServers,
95+
replicationFactor: 1,
96+
},
97+
],
98+
});
99+
} catch (e) {
100+
logger.error(util.format('Create topics error: %o', e));
101+
}
102+
}
103+
104+
if (topicMetadata && topicMetadata.topics.length > 0) {
105+
// we have topics, how many partitions do they have?
106+
for (const topic of topicMetadata.topics) {
107+
if (topic.partitions.length < numbersOfServers) {
108+
try {
109+
await admin.createPartitions({
110+
topicPartitions: [
111+
{
112+
topic: topic.name,
113+
count: numbersOfServers,
114+
},
115+
],
116+
});
117+
} catch (e) {
118+
logger.error(util.format('Create partitions error: %o', e));
119+
}
120+
}
121+
}
122+
}
123+
124+
// create with number of servers
125+
try {
126+
await admin.createTopics({
127+
topics: [
128+
{
129+
topic: requestTopic,
130+
numPartitions: numbersOfServers,
131+
replicationFactor: 1,
132+
},
133+
{
134+
topic: responseTopic,
135+
numPartitions: numbersOfServers,
136+
replicationFactor: 1,
137+
},
138+
],
139+
});
140+
} catch (e) {
141+
logger.error(util.format('Create topics error: %o', e));
142+
}
143+
144+
// disconnect
145+
await admin.disconnect();
146+
});
147+
148+
it(`Start Kafka apps`, async () => {
149+
// start all at once
150+
await Promise.all(
151+
Array(numbersOfServers)
152+
.fill(1)
153+
.map(async (v, i) => {
154+
// return startServer();
155+
156+
// wait in intervals so the consumers start in order
157+
return new Promise(resolve => {
158+
setTimeout(async () => {
159+
await startServer();
160+
161+
return resolve();
162+
}, 1000 * i);
163+
});
164+
}),
165+
);
166+
}).timeout(30000);
167+
168+
it(`Concurrent messages without forcing a rebalance.`, async () => {
169+
// wait a second before notifying the servers to respond
170+
setTimeout(async () => {
171+
// notify the other servers that it is time to respond
172+
await Promise.all(
173+
servers.map(async server => {
174+
// send to all servers since indexes don't necessarily align with server consumers
175+
return request(server).post('/go').send();
176+
}),
177+
);
178+
}, 1000);
179+
180+
await Promise.all(
181+
servers.map(async (server, index) => {
182+
// send requests
183+
const payload = {
184+
key: index,
185+
numbers: [1, index],
186+
};
187+
const result = (1 + index).toString();
188+
189+
return request(server)
190+
.post('/mathSumSyncNumberWait')
191+
.send(payload)
192+
.expect(200)
193+
.expect(200, result);
194+
}),
195+
);
196+
});
197+
198+
it(`Close kafka client consumer while waiting for message pattern response.`, async () => {
199+
await Promise.all(
200+
servers.map(async (server, index) => {
201+
// shut off and delete the leader
202+
if (index === 0) {
203+
return new Promise(resolve => {
204+
// wait a second before closing so the producers can send the message to the server consumers
205+
setTimeout(async () => {
206+
// get the controller
207+
const controller = apps[index].get(KafkaConcurrentController);
208+
209+
// close the controller clients
210+
await controller.client.close();
211+
212+
// notify the other servers that we have stopped
213+
await Promise.all(
214+
servers.map(async server => {
215+
// send to all servers since indexes don't necessarily align with server consumers
216+
return request(server).post('/go').send();
217+
}),
218+
);
219+
220+
return resolve();
221+
}, 1000);
222+
});
223+
}
224+
225+
// send requests
226+
const payload = {
227+
key: index,
228+
numbers: [1, index],
229+
};
230+
const result = (1 + index).toString();
231+
232+
return request(server)
233+
.post('/mathSumSyncNumberWait')
234+
.send(payload)
235+
.expect(200)
236+
.expect(200, result);
237+
}),
238+
);
239+
});
240+
241+
it(`Start kafka client consumer while waiting for message pattern response.`, async () => {
242+
await Promise.all(
243+
servers.map(async (server, index) => {
244+
// shut off and delete the leader
245+
if (index === 0) {
246+
return new Promise(resolve => {
247+
// wait a second before closing so the producers can send the message to the server consumers
248+
setTimeout(async () => {
249+
// get the controller
250+
const controller = apps[index].get(KafkaConcurrentController);
251+
252+
// connect the controller client
253+
await controller.client.connect();
254+
255+
// notify the servers that we have started
256+
await Promise.all(
257+
servers.map(async server => {
258+
// send to all servers since indexes don't necessarily align with server consumers
259+
return request(server).post('/go').send();
260+
}),
261+
);
262+
263+
return resolve();
264+
}, 1000);
265+
});
266+
}
267+
268+
// send requests
269+
const payload = {
270+
key: index,
271+
numbers: [1, index],
272+
};
273+
const result = (1 + index).toString();
274+
275+
return request(server)
276+
.post('/mathSumSyncNumberWait')
277+
.send(payload)
278+
.expect(200)
279+
.expect(200, result);
280+
}),
281+
);
282+
});
283+
284+
after(`Stopping Kafka app`, async () => {
285+
// close all concurrently
286+
return Promise.all(
287+
apps.map(async app => {
288+
return app.close();
289+
}),
290+
);
291+
});
292+
});

integration/microservices/e2e/sum-kafka.spec.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,13 @@ import { UserEntity } from '../src/kafka/entities/user.entity';
99
import { KafkaController } from '../src/kafka/kafka.controller';
1010
import { KafkaMessagesController } from '../src/kafka/kafka.messages.controller';
1111

12-
describe('Kafka transport', () => {
12+
describe('Kafka transport', function () {
1313
let server;
1414
let app: INestApplication;
1515

16+
// set timeout to be longer (especially for the after hook)
17+
this.timeout(30000);
18+
1619
it(`Start Kafka app`, async () => {
1720
const module = await Test.createTestingModule({
1821
controllers: [KafkaController, KafkaMessagesController],
@@ -29,6 +32,7 @@ describe('Kafka transport', () => {
2932
},
3033
},
3134
});
35+
app.enableShutdownHooks();
3236
await app.startAllMicroservicesAsync();
3337
await app.init();
3438
}).timeout(30000);
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
export class SumDto {
2+
key: string;
3+
numbers: number[];
4+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import {
2+
Body,
3+
Controller,
4+
HttpCode,
5+
OnModuleDestroy,
6+
OnModuleInit,
7+
Post,
8+
} from '@nestjs/common';
9+
import { Logger } from '@nestjs/common/services/logger.service';
10+
import { Client, ClientKafka, Transport } from '@nestjs/microservices';
11+
import { PartitionerArgs } from 'kafkajs';
12+
import { Observable } from 'rxjs';
13+
import { SumDto } from './dto/sum.dto';
14+
15+
/**
16+
* The following function explicity sends messages to the key representing the partition.
17+
*/
18+
const explicitPartitioner = () => {
19+
return ({ message }: PartitionerArgs) => {
20+
return parseFloat(message.headers.toPartition.toString());
21+
};
22+
};
23+
24+
@Controller()
25+
export class KafkaConcurrentController
26+
implements OnModuleInit, OnModuleDestroy {
27+
protected readonly logger = new Logger(KafkaConcurrentController.name);
28+
29+
@Client({
30+
transport: Transport.KAFKA,
31+
options: {
32+
client: {
33+
brokers: ['localhost:9092'],
34+
},
35+
run: {
36+
partitionsConsumedConcurrently: 3,
37+
},
38+
producer: {
39+
createPartitioner: explicitPartitioner,
40+
},
41+
},
42+
})
43+
public readonly client: ClientKafka;
44+
45+
async onModuleInit() {
46+
const requestPatterns = ['math.sum.sync.number.wait'];
47+
48+
requestPatterns.forEach(pattern => {
49+
this.client.subscribeToResponseOf(pattern);
50+
});
51+
52+
await this.client.connect();
53+
}
54+
55+
async onModuleDestroy() {
56+
await this.client.close();
57+
}
58+
59+
@Post('mathSumSyncNumberWait')
60+
@HttpCode(200)
61+
public mathSumSyncNumberWait(@Body() data: SumDto): Observable<string> {
62+
return this.client.send('math.sum.sync.number.wait', {
63+
headers: {
64+
toPartition: data.key.toString(),
65+
},
66+
key: data.key.toString(),
67+
value: data.numbers,
68+
});
69+
}
70+
}

0 commit comments

Comments
 (0)