1- import * as util from 'util' ;
2- import { Body , Controller , Get , HttpCode , Post , Query , Logger } from '@nestjs/common' ;
1+ import { Body , Controller , HttpCode , Post , Query , OnModuleInit } from '@nestjs/common' ;
32import {
43 Client ,
54 ClientProxy ,
65 EventPattern ,
7- MessagePattern ,
8- RpcException ,
96 Transport ,
10- KafkaConsumer ,
11- ClientKafka ,
127} from '@nestjs/microservices' ;
13- import { from , Observable , of , throwError } from 'rxjs' ;
14- import { catchError , scan , delay , map } from 'rxjs/operators' ;
15- import { Producer } from '@nestjs/common/interfaces/external/kafka-options.interface ' ;
8+ import { Observable } from 'rxjs' ;
9+
10+ import * as uuid from 'uuid ' ;
1611
1712@Controller ( )
18- export class KafkaController {
13+ export class KafkaController implements OnModuleInit {
1914 static IS_NOTIFIED = false ;
15+ static MATH_SUM = 0 ;
2016
2117 @Client ( {
2218 transport : Transport . KAFKA ,
@@ -27,9 +23,12 @@ export class KafkaController {
2723 } ,
2824 } )
2925 private readonly client : ClientProxy ;
30- // private readonly client: ClientKafka;
3126
32- private serialize ( data : any ) {
27+ public async onModuleInit ( ) {
28+ await this . client . connect ( ) ;
29+ }
30+
31+ private parse ( data : any ) {
3332 data . message . key = data . message . key . toString ( ) ;
3433 data . message . value = JSON . parse ( data . message . value . toString ( ) ) ;
3534 }
@@ -39,116 +38,34 @@ export class KafkaController {
3938 async call (
4039 @Query ( 'command' ) cmd ,
4140 @Body ( ) data : number [ ] ,
42- ) : Promise < Observable < number > > {
43- await this . client . connect ( ) ;
44- // return this.client.send<number>(cmd, data);
45-
46- return this . client . emit ( 'test.one' , [
47- {
48- key : 'sum' ,
49- value : JSON . stringify ( {
50- values : data ,
51- } ) ,
52- headers : {
53- 'correlation-id' : '2bfb68bb-893a-423b-a7fa-7b568cad5b67' ,
54- } ,
55- } ,
56- ] ) . pipe ( map ( ( result ) => {
57- Logger . error ( util . format ( 'post() result %o' , result ) ) ;
58-
59- return 15 ;
60- } ) , delay ( 500000 ) ) ;
61-
62- // Logger.error(util.format('post() result %o', result));
63-
64- // return from(Promise.resolve(15)).pipe(delay(500000));
41+ ) : Promise < Observable < any > > {
42+ return this . client . emit ( 'math.sum' , data . map ( ( num ) => {
43+ return {
44+ key : uuid . v4 ( ) , // stick to a single partition
45+ value : num . toString ( ) ,
46+ } ;
47+ } ) ) ;
6548 }
6649
67- @EventPattern ( 'test.one' )
68- testOne ( data : any ) {
69- this . serialize ( data ) ;
70-
71- Logger . error ( util . format ( 'testOne() data %o' , data ) ) ;
72-
73- return from ( Promise . resolve ( 15 ) ) . pipe ( delay ( 5000 ) ) ;
74- // throw new Error('test');
50+ @EventPattern ( 'math.sum' )
51+ mathSum ( data : any ) {
52+ KafkaController . MATH_SUM += parseFloat ( data . message . value ) ;
7553 }
7654
77- // @MessagePattern (this.consumer)
78- // testTo(data: any){
79- // Logger.error(util.format('testOne() data %o', data));
80-
81- // return from(Promise.resolve(15)).pipe(delay(5000));
82- // // throw new Error('test');
83- // }
84- // testOne(...data: any){
85- // Logger.error(util.format('testOne() data %o', data));
86- // throw new Error('test');
87- // }
88-
89- // @Post ('stream')
90- // @HttpCode (200)
91- // stream(@Body() data: number[]): Observable<number> {
92- // return this.client
93- // .send<number>('streaming.sum', data)
94- // .pipe(scan((a, b) => a + b));
95- // }
96-
97- // @Post ('concurrent')
98- // @HttpCode (200)
99- // concurrent(@Body() data: number[][]): Promise<boolean> {
100- // const send = async (tab: number[]) => {
101- // const expected = tab.reduce((a, b) => a + b);
102- // const result = await this.client
103- // .send<number>('math.sum', tab)
104- // .toPromise();
105-
106- // return result === expected;
107- // };
108- // return data
109- // .map(async tab => await send(tab))
110- // .reduce(async (a, b) => (await a) && (await b));
111- // }
112-
113- // @MessagePattern ('math.*')
114- // sum(data: number[]): number {
115- // return (data || []).reduce((a, b) => a + b);
116- // }
117-
118- // @MessagePattern ('async.*')
119- // async asyncSum(data: number[]): Promise<number> {
120- // return (data || []).reduce((a, b) => a + b);
121- // }
122-
123- // @MessagePattern ('stream.*')
124- // streamSum(data: number[]): Observable<number> {
125- // return of((data || []).reduce((a, b) => a + b));
126- // }
127-
128- // @MessagePattern ('streaming.*')
129- // streaming(data: number[]): Observable<number> {
130- // return from(data);
131- // }
132-
133- // @Get ('exception')
134- // async getError() {
135- // return await this.client
136- // .send<number>('exception', {})
137- // .pipe(catchError(err => of(err)));
138- // }
139-
140- // @MessagePattern ('exception')
141- // throwError(): Observable<number> {
142- // return throwError(new RpcException('test'));
143- // }
55+ @Post ( 'notify' )
56+ async sendNotification ( ) : Promise < any > {
57+ return this . client . emit ( 'notification' , [ {
58+ key : 'notify' ,
59+ value : JSON . stringify ( {
60+ notify : true ,
61+ } ) ,
62+ } ] ) ;
63+ }
14464
145- // @Post ('notify')
146- // async sendNotification(): Promise<any> {
147- // return this.client.emit<number>('notification', true);
148- // }
65+ @EventPattern ( 'notification' )
66+ eventHandler ( data : any ) {
67+ this . parse ( data ) ;
14968
150- // @EventPattern ('notification')
151- // eventHandler(data: boolean) {
152- // KafkaController.IS_NOTIFIED = data;
153- // }
69+ KafkaController . IS_NOTIFIED = data . message . value . notify ;
70+ }
15471}
0 commit comments