Skip to content

Commit ced278c

Browse files
sample(grpc): update grpc sample with client streaming
1 parent 2d43719 commit ced278c

2 files changed

Lines changed: 44 additions & 12 deletions

File tree

Lines changed: 43 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,25 @@
1-
import { Controller, Get, Inject, OnModuleInit } from '@nestjs/common';
2-
import { ClientGrpc, GrpcMethod } from '@nestjs/microservices';
3-
import { Observable } from 'rxjs';
1+
import { Controller, Get, Inject, OnModuleInit, Param } from '@nestjs/common';
2+
import {
3+
ClientGrpc,
4+
GrpcMethod,
5+
GrpcStreamMethod,
6+
} from '@nestjs/microservices';
7+
import { Observable, ReplaySubject, Subject } from 'rxjs';
8+
import { toArray } from 'rxjs/operators';
49
import { HeroById } from './interfaces/hero-by-id.interface';
510
import { Hero } from './interfaces/hero.interface';
611

712
interface HeroService {
8-
findOne(data: HeroById): Observable<any>;
13+
findOne(data: HeroById): Observable<Hero>;
14+
findMany(upstream: Observable<HeroById>): Observable<Hero>;
915
}
1016

11-
@Controller()
17+
@Controller('hero')
1218
export class HeroController implements OnModuleInit {
19+
private readonly items: Hero[] = [
20+
{ id: 1, name: 'John' },
21+
{ id: 2, name: 'Doe' },
22+
];
1323
private heroService: HeroService;
1424

1525
constructor(@Inject('HERO_PACKAGE') private readonly client: ClientGrpc) {}
@@ -19,16 +29,37 @@ export class HeroController implements OnModuleInit {
1929
}
2030

2131
@Get()
22-
execute(): Observable<any> {
23-
return this.heroService.findOne({ id: 1 });
32+
getMany(): Observable<Hero[]> {
33+
const ids$ = new ReplaySubject<HeroById>();
34+
ids$.next({ id: 1 });
35+
ids$.next({ id: 2 });
36+
ids$.complete();
37+
38+
const stream = this.heroService.findMany(ids$.asObservable());
39+
return stream.pipe(toArray());
40+
}
41+
42+
@Get(':id')
43+
getById(@Param('id') id: string): Observable<Hero> {
44+
return this.heroService.findOne({ id: +id });
2445
}
2546

2647
@GrpcMethod('HeroService')
2748
findOne(data: HeroById): Hero {
28-
const items: Hero[] = [
29-
{ id: 1, name: 'John' },
30-
{ id: 2, name: 'Doe' },
31-
];
32-
return items.find(({ id }) => id === data.id);
49+
return this.items.find(({ id }) => id === data.id);
50+
}
51+
52+
@GrpcStreamMethod('HeroService')
53+
findMany(data$: Observable<HeroById>): Observable<Hero> {
54+
const hero$ = new Subject<Hero>();
55+
56+
const onNext = (heroById: HeroById) => {
57+
const item = this.items.find(({ id }) => id === heroById.id);
58+
hero$.next(item);
59+
};
60+
const onComplete = () => hero$.complete();
61+
data$.subscribe(onNext, null, onComplete);
62+
63+
return hero$.asObservable();
3364
}
3465
}

sample/04-grpc/src/hero/hero.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package hero;
44

55
service HeroService {
66
rpc FindOne (HeroById) returns (Hero) {}
7+
rpc FindMany (stream HeroById) returns (stream Hero) {}
78
}
89

910
message HeroById {

0 commit comments

Comments
 (0)