Skip to content

Commit d96b8eb

Browse files
committed
Moves kafka logger into its own helper.
1 parent be239b5 commit d96b8eb

6 files changed

Lines changed: 138 additions & 58 deletions

File tree

packages/microservices/client/client-kafka.ts

Lines changed: 2 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import {
1010
import { ReadPacket, KafkaOptions, WritePacket, PacketId } from '../interfaces';
1111
import { ClientProxy } from './client-proxy';
1212

13-
import { KafkaSerializer, KafkaRoundRobinByTimePartitionAssigner } from '../helpers';
13+
import { KafkaSerializer, KafkaRoundRobinByTimePartitionAssigner, KafkaLogger } from '../helpers';
1414

1515
import {
1616
KafkaConfig,
@@ -21,7 +21,6 @@ import {
2121
EachMessagePayload,
2222
Message,
2323
KafkaMessage,
24-
logLevel,
2524
ConsumerGroupJoinEvent
2625
} from '../external/kafka.interface';
2726
import { KafkaHeaders } from '../enums';
@@ -118,34 +117,10 @@ export class ClientKafka extends ClientProxy {
118117
}
119118

120119
public createClient<T = any>(): T {
121-
const kafkaLogger = kafkaLogLevel => ({namespace, level, label, log}) => {
122-
let loggerMethod: string;
123-
124-
switch (level) {
125-
case logLevel.ERROR:
126-
case logLevel.NOTHING:
127-
loggerMethod = 'error';
128-
break;
129-
case logLevel.WARN:
130-
loggerMethod = 'warn';
131-
break;
132-
case logLevel.INFO:
133-
loggerMethod = 'log';
134-
break;
135-
case logLevel.DEBUG:
136-
default:
137-
loggerMethod = 'debug';
138-
break;
139-
}
140-
141-
const { message, ...others } = log;
142-
this.logger[loggerMethod](`${label} [${namespace}] ${message} ${JSON.stringify(others)}`);
143-
};
144-
145120
return new kafkaPackage.Kafka(Object.assign(this.options.client || {}, {
146121
clientId: this.clientId,
147122
brokers: this.brokers,
148-
logCreator: kafkaLogger,
123+
logCreator: KafkaLogger.bind(null, this.logger),
149124
}) as KafkaConfig);
150125
}
151126

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
export * from './json-socket';
22
export * from './kafka-serializer';
3+
export * from './kafka-logger';
34
export * from './kafka-round-robin-by-time-partition-assigner';
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import {
2+
logLevel
3+
} from '../external/kafka.interface';
4+
5+
export const KafkaLogger = (logger: any, kafkaLogLevel) => ({namespace, level, label, log}) => {
6+
let loggerMethod: string;
7+
8+
switch (level) {
9+
case logLevel.ERROR:
10+
case logLevel.NOTHING:
11+
loggerMethod = 'error';
12+
break;
13+
case logLevel.WARN:
14+
loggerMethod = 'warn';
15+
break;
16+
case logLevel.INFO:
17+
loggerMethod = 'log';
18+
break;
19+
case logLevel.DEBUG:
20+
default:
21+
loggerMethod = 'debug';
22+
break;
23+
}
24+
25+
const { message, ...others } = log;
26+
logger[loggerMethod](`${label} [${namespace}] ${message} ${JSON.stringify(others)}`);
27+
};

packages/microservices/server/server-kafka.ts

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import { CustomTransportStrategy, KafkaOptions, ReadPacket, PacketId, WritePacke
2121
import { KafkaHeaders } from '../enums';
2222
import { Server } from './server';
2323

24-
import { KafkaSerializer } from '../helpers/kafka-serializer';
24+
import { KafkaSerializer, KafkaLogger } from '../helpers';
2525

2626
let kafkaPackage: any = {};
2727

@@ -79,34 +79,10 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
7979
}
8080

8181
public createClient<T = any>(): T {
82-
const kafkaLogger = kafkaLogLevel => ({namespace, level, label, log}) => {
83-
let loggerMethod: string;
84-
85-
switch (level) {
86-
case logLevel.ERROR:
87-
case logLevel.NOTHING:
88-
loggerMethod = 'error';
89-
break;
90-
case logLevel.WARN:
91-
loggerMethod = 'warn';
92-
break;
93-
case logLevel.INFO:
94-
loggerMethod = 'log';
95-
break;
96-
case logLevel.DEBUG:
97-
default:
98-
loggerMethod = 'debug';
99-
break;
100-
}
101-
102-
const { message, ...others } = log;
103-
this.logger[loggerMethod](`${label} [${namespace}] ${message} ${JSON.stringify(others)}`);
104-
};
105-
10682
return new kafkaPackage.Kafka(Object.assign(this.options.client || {}, {
10783
clientId: this.clientId,
10884
brokers: this.brokers,
109-
logCreator: kafkaLogger,
85+
logCreator: KafkaLogger.bind(null, this.logger),
11086
}) as KafkaConfig);
11187
}
11288

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
import { expect } from 'chai';
2+
import * as sinon from 'sinon';
3+
import { KafkaLogger } from '../../helpers/kafka-logger';
4+
import { logLevel } from '../../external/kafka.interface';
5+
6+
const namespace = 'namespace';
7+
const label = 'label';
8+
const entry = {
9+
message: 'message',
10+
other: {
11+
stuff: 'here'
12+
}
13+
};
14+
15+
describe('kafka logger', () => {
16+
let kafkaLogger: any;
17+
18+
let error: sinon.SinonSpy;
19+
let warn: sinon.SinonSpy;
20+
let log: sinon.SinonSpy;
21+
let debug: sinon.SinonSpy;
22+
23+
beforeEach(() => {
24+
// set
25+
error = sinon.spy();
26+
warn = sinon.spy();
27+
log = sinon.spy();
28+
debug = sinon.spy();
29+
30+
kafkaLogger = KafkaLogger({
31+
error,
32+
warn,
33+
log,
34+
debug
35+
}, logLevel.DEBUG);
36+
});
37+
38+
it('error', () => {
39+
kafkaLogger({
40+
namespace,
41+
level: logLevel.ERROR,
42+
label,
43+
log: entry
44+
});
45+
46+
expect(error.calledOnce).to.be.true;
47+
expect(error.args[0][0]).to.eq('label [namespace] message {"other":{"stuff":"here"}}');
48+
});
49+
50+
it('nothing', () => {
51+
kafkaLogger({
52+
namespace,
53+
level: logLevel.NOTHING,
54+
label,
55+
log: entry
56+
});
57+
58+
expect(error.calledOnce).to.be.true;
59+
});
60+
61+
it('warn', () => {
62+
kafkaLogger({
63+
namespace,
64+
level: logLevel.WARN,
65+
label,
66+
log: entry
67+
});
68+
69+
expect(warn.calledOnce).to.be.true;
70+
});
71+
72+
it('info', () => {
73+
kafkaLogger({
74+
namespace,
75+
level: logLevel.INFO,
76+
label,
77+
log: entry
78+
});
79+
80+
expect(log.calledOnce).to.be.true;
81+
});
82+
83+
it('debug', () => {
84+
kafkaLogger({
85+
namespace,
86+
level: logLevel.DEBUG,
87+
label,
88+
log: entry
89+
});
90+
91+
expect(debug.calledOnce).to.be.true;
92+
});
93+
});

packages/microservices/test/server/server-kafka.spec.ts

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ class NoopLogger extends Logger {
1616

1717
describe('ServerKafka', () => {
1818
let server: ServerKafka;
19-
2019
let callback: sinon.SinonSpy;
2120
let bindEventsStub: sinon.SinonStub;
2221
let connect: sinon.SinonSpy;
@@ -25,6 +24,7 @@ describe('ServerKafka', () => {
2524
let consumerStub: sinon.SinonStub;
2625
let producerStub: sinon.SinonStub;
2726
let client;
27+
2828
beforeEach(() => {
2929
server = new ServerKafka({});
3030
callback = sinon.spy();
@@ -33,17 +33,17 @@ describe('ServerKafka', () => {
3333
run = sinon.spy();
3434

3535
consumerStub = sinon.stub(server, 'consumer')
36-
.callsFake( () => {
36+
.callsFake(() => {
3737
return {
3838
connect,
3939
subscribe,
4040
run,
4141
};
4242
});
4343
producerStub = sinon.stub(server, 'producer')
44-
.callsFake( () => {
44+
.callsFake(() => {
4545
return {
46-
connect,
46+
connect
4747
};
4848
});
4949
client = {
@@ -78,16 +78,24 @@ describe('ServerKafka', () => {
7878
};
7979

8080
describe('close', () => {
81+
const consumer = {disconnect: sinon.spy()};
82+
const producer = {disconnect: sinon.spy()};
83+
beforeEach(() => {
84+
(server as any).consumer = consumer;
85+
(server as any).producer = producer;
86+
});
8187
it('should close server', () => {
8288
server.close();
89+
90+
expect(consumer.disconnect.calledOnce).to.be.true;
91+
expect(producer.disconnect.calledOnce).to.be.true;
8392
expect(server.consumer).to.be.null;
8493
expect(server.producer).to.be.null;
8594
expect(server.client).to.be.null;
8695
});
8796
});
8897

8998
describe('listen', () => {
90-
9199
it('should call "bindEvents"', async () => {
92100
bindEventsStub = sinon
93101
.stub(server, 'bindEvents')

0 commit comments

Comments
 (0)