2121
2222import it .unimi .dsi .fastutil .io .FastBufferedOutputStream ;
2323
24+ import java .io .FileNotFoundException ;
2425import java .io .FileOutputStream ;
2526import java .io .IOException ;
2627import java .io .OutputStream ;
@@ -84,17 +85,11 @@ public class RecordingOutputStream extends OutputStream {
8485 private byte [] buffer ;
8586
8687 /** current virtual position in the recording */
87- private long position ;
88+ long position ;
8889
8990 /** flag to disable recording */
9091 private boolean recording ;
9192
92- /**
93- * Reusable buffer for FastBufferedOutputStream
94- */
95- protected byte [] bufStreamBuf =
96- new byte [ FastBufferedOutputStream .DEFAULT_BUFFER_SIZE ];
97-
9893 /**
9994 * True if we're to digest content.
10095 */
@@ -132,6 +127,29 @@ public class RecordingOutputStream extends OutputStream {
132127 */
133128 protected long messageBodyBeginMark ;
134129
130+ /**
131+ * While messageBodyBeginMark is not set, the last two bytes seen.
132+ *
133+ * <p>
134+ * This class does automatic detection of http message body begin (i.e. end
135+ * of http headers). Unfortunately httpcomponents did not want to add
136+ * functionality to help us with this, see
137+ * https://issues.apache.org/jira/browse/HTTPCORE-325
138+ *
139+ * <p>
140+ * It works like this: while messageBodyBeginMark is not set, we remember
141+ * the last two bytes seen, and look at each byte we write. If the
142+ * lastTwoBytes+currentByte is "\n\r\n", or lastTwoBytes[1]+currentByte is
143+ * "\n\n" then we call markMessageBodyBegin() at the position after
144+ * currentByte.
145+ *
146+ * <p>
147+ * An assumption here is that protocols other than http don't have headers,
148+ * and for those protocols the user of this class will call
149+ * markMessageBodyBegin() at position 0 before writing anything.
150+ */
151+ protected int [] lastTwoBytes = new int [] {-1 , -1 };
152+
135153 /**
136154 * Stream to record.
137155 */
@@ -183,15 +201,18 @@ public void open(OutputStream wrappedStream) throws IOException {
183201 }
184202 clearForReuse ();
185203 this .out = wrappedStream ;
204+ startTime = System .currentTimeMillis ();
205+ }
206+
207+ protected OutputStream ensureDiskStream () throws FileNotFoundException {
186208 if (this .diskStream == null ) {
187- // TODO: Fix so we only make file when its actually needed.
188209 FileOutputStream fis = new FileOutputStream (this .backingFilename );
189-
190- this .diskStream = new RecyclingFastBufferedOutputStream (fis , bufStreamBuf );
210+ this .diskStream = new FastBufferedOutputStream (fis );
191211 }
192- startTime = System . currentTimeMillis () ;
212+ return this . diskStream ;
193213 }
194214
215+
195216 public void write (int b ) throws IOException {
196217 if (position <maxPosition ) {
197218 // revisiting previous content; do nothing but advance position
@@ -204,6 +225,20 @@ public void write(int b) throws IOException {
204225 if (this .out != null ) {
205226 this .out .write (b );
206227 }
228+
229+ // see comment on int[] lastTwoBytes
230+ if (messageBodyBeginMark < 0l ) {
231+ // looking for "\n\n" or "\n\r\n"
232+ if (b == '\n'
233+ && (lastTwoBytes [1 ] == '\n'
234+ || (lastTwoBytes [0 ] == '\n' && lastTwoBytes [1 ] == '\r' ))) {
235+ markMessageBodyBegin ();
236+ } else {
237+ lastTwoBytes [0 ] = lastTwoBytes [1 ];
238+ lastTwoBytes [1 ] = b ;
239+ }
240+ }
241+
207242 checkLimits ();
208243 }
209244
@@ -220,6 +255,14 @@ public void write(byte[] b, int off, int len) throws IOException {
220255 off += consumeRange ;
221256 len -= consumeRange ;
222257 }
258+
259+ // see comment on int[] lastTwoBytes
260+ while (messageBodyBeginMark < 0 && len > 0 ) {
261+ write (b [off ]);
262+ off ++;
263+ len --;
264+ }
265+
223266 if (recording ) {
224267 record (b , off , len );
225268 }
@@ -251,7 +294,7 @@ protected void checkLimits() throws RecorderIOException {
251294 throw new RecorderTimeoutException ();
252295 }
253296 // need to throttle reading to hit max configured rate?
254- if (position /duration > maxRateBytesPerMs ) {
297+ if (position /duration >= maxRateBytesPerMs ) {
255298 long desiredDuration = position / maxRateBytesPerMs ;
256299 try {
257300 Thread .sleep (desiredDuration -duration );
@@ -274,10 +317,7 @@ private void record(int b) throws IOException {
274317 this .digest .update ((byte )b );
275318 }
276319 if (this .position >= this .buffer .length ) {
277- // TODO: Its possible to call write w/o having first opened a
278- // stream. Protect ourselves against this.
279- assert this .diskStream != null : "Diskstream is null" ;
280- this .diskStream .write (b );
320+ this .ensureDiskStream ().write (b );
281321 } else {
282322 this .buffer [(int ) this .position ] = (byte ) b ;
283323 }
@@ -312,12 +352,7 @@ private void record(byte[] b, int off, int len) throws IOException {
312352 */
313353 private void tailRecord (byte [] b , int off , int len ) throws IOException {
314354 if (this .position >= this .buffer .length ){
315- // TODO: Its possible to call write w/o having first opened a
316- // stream. Lets protect ourselves against this.
317- if (this .diskStream == null ) {
318- throw new IOException ("diskstream is null" );
319- }
320- this .diskStream .write (b , off , len );
355+ this .ensureDiskStream ().write (b , off , len );
321356 this .position += len ;
322357 } else {
323358 assert this .buffer != null : "Buffer is null" ;
@@ -557,6 +592,18 @@ public long getRemainingLength() {
557592 return maxLength - position ;
558593 }
559594
595+ /**
596+ * Forget about anything past the point where the content-body starts. This
597+ * is needed to support FetchHTTP's shouldFetchBody setting. See also the
598+ * docs on {@link #lastTwoBytes}
599+ */
600+ public void chopAtMessageBodyBegin () {
601+ if (messageBodyBeginMark >= 0 ) {
602+ this .size = messageBodyBeginMark ;
603+ this .position = messageBodyBeginMark ;
604+ }
605+ }
606+
560607 public void clearForReuse () throws IOException {
561608 this .out = null ;
562609 this .position = 0 ;
0 commit comments