Skip to content

Commit 18c27cd

Browse files
committed
fix: use pipeline over stream.pipe
`pipeline` ends up destroying streams used if there is an error in one of the streams. Due to this, there's no chance of a memory leak from errored out streams. There's also now an addition of adding an error handler to the `StreamableFile` so that stream errors by default return a 400 and can be customized to return an error message however a developer would like. These only effect the express adapter because of how Fastify already internally handles streams. fix: nestjs#9759
1 parent 61b11ad commit 18c27cd

5 files changed

Lines changed: 52 additions & 1 deletion

File tree

integration/send-files/e2e/express.spec.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,4 +65,7 @@ describe('Express FileSend', () => {
6565
expect(res.text).to.be.eq(readmeString);
6666
});
6767
});
68+
it('should return an error if the file does not exist', async () => {
69+
return request(app.getHttpServer()).get('/file/not/exist').expect(400);
70+
});
6871
});

integration/send-files/src/app.controller.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,9 @@ export class AppController {
3131
getFileWithHeaders(): StreamableFile {
3232
return this.appService.getFileWithHeaders();
3333
}
34+
35+
@Get('file/not/exist')
36+
getNonExistantFile(): StreamableFile {
37+
return this.appService.getFileThatDoesNotExist();
38+
}
3439
}

integration/send-files/src/app.service.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,8 @@ export class AppService {
3535
},
3636
);
3737
}
38+
39+
getFileThatDoesNotExist(): StreamableFile {
40+
return new StreamableFile(createReadStream('does-not-exist.txt'));
41+
}
3842
}

packages/common/file-stream/streamable-file.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,22 @@ import { types } from 'util';
33
import { isFunction } from '../utils/shared.utils';
44
import { StreamableFileOptions } from './streamable-options.interface';
55

6+
interface StreamableHandlerResponse {
7+
statusCode: number;
8+
send: (msg: string) => void;
9+
}
10+
611
export class StreamableFile {
712
private readonly stream: Readable;
813

14+
private handler: (err: Error, response: StreamableHandlerResponse) => void = (
15+
err: Error,
16+
res,
17+
) => {
18+
res.statusCode = 400;
19+
res.send(err.message);
20+
};
21+
922
constructor(buffer: Uint8Array, options?: StreamableFileOptions);
1023
constructor(readable: Readable, options?: StreamableFileOptions);
1124
constructor(
@@ -38,4 +51,18 @@ export class StreamableFile {
3851
length,
3952
};
4053
}
54+
55+
get errorHandler(): (
56+
err: Error,
57+
response: StreamableHandlerResponse,
58+
) => void {
59+
return this.handler;
60+
}
61+
62+
setErrorHandler(
63+
handler: (err: Error, response: StreamableHandlerResponse) => void,
64+
) {
65+
this.handler = handler;
66+
return this;
67+
}
4168
}

packages/platform-express/adapters/express-adapter.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import {
22
InternalServerErrorException,
3+
Logger,
34
RawBodyRequest,
45
RequestMethod,
56
StreamableFile,
@@ -33,6 +34,7 @@ import * as cors from 'cors';
3334
import * as express from 'express';
3435
import * as http from 'http';
3536
import * as https from 'https';
37+
import { PassThrough, pipeline } from 'stream';
3638
import { ServeStaticOptions } from '../interfaces/serve-static-options.interface';
3739

3840
type VersionedRoute = <
@@ -78,7 +80,17 @@ export class ExpressAdapter extends AbstractHttpAdapter {
7880
) {
7981
response.setHeader('Content-Length', streamHeaders.length);
8082
}
81-
return body.getStream().pipe(response);
83+
return pipeline(
84+
body.getStream().on('error', (err: Error) => {
85+
body.errorHandler(err, response);
86+
}),
87+
response,
88+
(err: Error) => {
89+
if (err) {
90+
new Logger('ExpressAdapter').error(err.message, err.stack);
91+
}
92+
},
93+
);
8294
}
8395
return isObject(body) ? response.json(body) : response.send(String(body));
8496
}

0 commit comments

Comments
 (0)