Skip to content

Commit 8e33ab7

Browse files
committed
Merge pull request #4 from nlevitt/hc43
changes needed for move to httpcomponents
2 parents 7ff3d4d + 7c6c673 commit 8e33ab7

4 files changed

Lines changed: 144 additions & 22 deletions

File tree

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
<groupId>org.archive</groupId>
66
<artifactId>ia-web-commons</artifactId>
7-
<version>1.1.0</version>
7+
<version>1.1.1-SNAPSHOT</version>
88
<packaging>jar</packaging>
99

1010
<name>ia-web-commons</name>

src/main/java/org/archive/io/RecordingInputStream.java

Lines changed: 77 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,10 @@ public RecordingInputStream(int bufferSize, String backingFilename)
7474
}
7575

7676
public void open(InputStream wrappedStream) throws IOException {
77-
logger.fine(Thread.currentThread().getName() + " opening " +
78-
wrappedStream + ", " + Thread.currentThread().getName());
77+
if (logger.isLoggable(Level.FINE)) {
78+
logger.fine("wrapping " + wrappedStream + " in thread "
79+
+ Thread.currentThread().getName());
80+
}
7981
if(isOpen()) {
8082
// error; should not be opening/wrapping in an unclosed
8183
// stream remains open
@@ -135,11 +137,11 @@ public int read(byte[] b) throws IOException {
135137

136138
public void close() throws IOException {
137139
if (logger.isLoggable(Level.FINE)) {
138-
logger.fine(Thread.currentThread().getName() + " closing " +
139-
this.in + ", " + Thread.currentThread().getName());
140+
logger.fine("closing " + this.in + " in thread "
141+
+ Thread.currentThread().getName());
140142
}
141143
IOUtils.closeQuietly(this.in);
142-
this.in = null;
144+
this.in = null;
143145
IOUtils.closeQuietly(this.recordingOutputStream);
144146
}
145147

@@ -159,20 +161,77 @@ public long readFully() throws IOException {
159161
return this.recordingOutputStream.getSize();
160162
}
161163

164+
public void readToEndOfContent(long contentLength)
165+
throws IOException, InterruptedException {
166+
// Check we're open before proceeding.
167+
if (!isOpen()) {
168+
// TODO: should this be a noisier exception-raising error?
169+
return;
170+
}
171+
172+
long totalBytes = recordingOutputStream.position - recordingOutputStream.getMessageBodyBegin();
173+
long bytesRead = -1L;
174+
long maxToRead = -1;
175+
while (contentLength <= 0 || totalBytes < contentLength) {
176+
try {
177+
// read no more than soft max
178+
maxToRead = (contentLength <= 0)
179+
? drainBuffer.length
180+
: Math.min(drainBuffer.length, contentLength - totalBytes);
181+
// nor more than hard max
182+
maxToRead = Math.min(maxToRead, recordingOutputStream.getRemainingLength());
183+
// but always at least 1 (to trigger hard max exception) XXX wtf is this?
184+
maxToRead = Math.max(maxToRead, 1);
185+
186+
bytesRead = read(drainBuffer,0,(int)maxToRead);
187+
if (bytesRead == -1) {
188+
break;
189+
}
190+
totalBytes += bytesRead;
191+
192+
if (Thread.interrupted()) {
193+
throw new InterruptedException("Interrupted during IO");
194+
}
195+
} catch (SocketTimeoutException e) {
196+
// A socket timeout is just a transient problem, meaning
197+
// nothing was available in the configured timeout period,
198+
// but something else might become available later.
199+
// Take this opportunity to check the overall
200+
// timeout (below). One reason for this timeout is
201+
// servers that keep up the connection, 'keep-alive', even
202+
// though we asked them to not keep the connection open.
203+
if (logger.isLoggable(Level.FINE)) {
204+
logger.log(Level.FINE, "socket timeout", e);
205+
}
206+
// check for interrupt
207+
if (Thread.interrupted()) {
208+
throw new InterruptedException("Interrupted during IO");
209+
}
210+
// check for overall timeout
211+
recordingOutputStream.checkLimits();
212+
} catch (SocketException se) {
213+
throw se;
214+
} catch (NullPointerException e) {
215+
// [ 896757 ] NPEs in Andy's Th-Fri Crawl.
216+
// A crawl was showing NPE's in this part of the code but can
217+
// not reproduce. Adding this rethrowing catch block w/
218+
// diagnostics to help should we come across the problem in the
219+
// future.
220+
throw new NullPointerException("Stream " + this.in + ", " +
221+
e.getMessage() + " " + Thread.currentThread().getName());
222+
}
223+
}
224+
}
225+
162226
/**
163227
* Read all of a stream (Or read until we timeout or have read to the max).
164228
* @param softMaxLength Maximum length to read; if zero or < 0, then no
165229
* limit. If met, return normally.
166-
* @param hardMaxLength Maximum length to read; if zero or < 0, then no
167-
* limit. If exceeded, throw RecorderLengthExceededException
168-
* @param timeout Timeout in milliseconds for total read; if zero or
169-
* negative, timeout is <code>Long.MAX_VALUE</code>. If exceeded, throw
170-
* RecorderTimeoutException
171-
* @param maxBytesPerMs How many bytes per millisecond.
172230
* @throws IOException failed read.
173231
* @throws RecorderLengthExceededException
174232
* @throws RecorderTimeoutException
175233
* @throws InterruptedException
234+
* @deprecated
176235
*/
177236
public void readFullyOrUntil(long softMaxLength)
178237
throws IOException, RecorderLengthExceededException,
@@ -349,6 +408,13 @@ public int getRecordedBufferLength() {
349408
return recordingOutputStream.getBufferLength();
350409
}
351410

411+
/**
412+
* See doc on {@link RecordingOutputStream#chopAtMessageBodyBegin()}
413+
*/
414+
public void chopAtMessageBodyBegin() {
415+
recordingOutputStream.chopAtMessageBodyBegin();
416+
}
417+
352418
public void clearForReuse() throws IOException {
353419
recordingOutputStream.clearForReuse();
354420
}

src/main/java/org/archive/io/RecordingOutputStream.java

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public class RecordingOutputStream extends OutputStream {
8484
private byte[] buffer;
8585

8686
/** current virtual position in the recording */
87-
private long position;
87+
long position;
8888

8989
/** flag to disable recording */
9090
private boolean recording;
@@ -132,6 +132,29 @@ public class RecordingOutputStream extends OutputStream {
132132
*/
133133
protected long messageBodyBeginMark;
134134

135+
/**
136+
* While messageBodyBeginMark is not set, the last two bytes seen.
137+
*
138+
* <p>
139+
* This class does automatic detection of http message body begin (i.e. end
140+
* of http headers). Unfortunately httpcomponents did not want to add
141+
* functionality to help us with this, see
142+
* https://issues.apache.org/jira/browse/HTTPCORE-325
143+
*
144+
* <p>
145+
* It works like this: while messageBodyBeginMark is not set, we remember
146+
* the last two bytes seen, and look at each byte we write. If the
147+
* lastTwoBytes+currentByte is "\n\r\n", or lastTwoBytes[1]+currentByte is
148+
* "\n\n" then we call markMessageBodyBegin() at the position after
149+
* currentByte.
150+
*
151+
* <p>
152+
* An assumption here is that protocols other than http don't have headers,
153+
* and for those protocols the user of this class will call
154+
* markMessageBodyBegin() at position 0 before writing anything.
155+
*/
156+
protected int[] lastTwoBytes = new int[] {-1, -1};
157+
135158
/**
136159
* Stream to record.
137160
*/
@@ -204,6 +227,20 @@ public void write(int b) throws IOException {
204227
if (this.out != null) {
205228
this.out.write(b);
206229
}
230+
231+
// see comment on int[] lastTwoBytes
232+
if (messageBodyBeginMark < 0l) {
233+
// looking for "\n\n" or "\n\r\n"
234+
if (b == '\n'
235+
&& (lastTwoBytes[1] == '\n'
236+
|| (lastTwoBytes[0] == '\n' && lastTwoBytes[1] == '\r'))) {
237+
markMessageBodyBegin();
238+
} else {
239+
lastTwoBytes[0] = lastTwoBytes[1];
240+
lastTwoBytes[1] = b;
241+
}
242+
}
243+
207244
checkLimits();
208245
}
209246

@@ -220,6 +257,14 @@ public void write(byte[] b, int off, int len) throws IOException {
220257
off += consumeRange;
221258
len -= consumeRange;
222259
}
260+
261+
// see comment on int[] lastTwoBytes
262+
while (messageBodyBeginMark < 0 && len > 0) {
263+
write(b[off]);
264+
off++;
265+
len--;
266+
}
267+
223268
if(recording) {
224269
record(b, off, len);
225270
}
@@ -251,7 +296,7 @@ protected void checkLimits() throws RecorderIOException {
251296
throw new RecorderTimeoutException();
252297
}
253298
// need to throttle reading to hit max configured rate?
254-
if(position/duration > maxRateBytesPerMs) {
299+
if(position/duration >= maxRateBytesPerMs) {
255300
long desiredDuration = position / maxRateBytesPerMs;
256301
try {
257302
Thread.sleep(desiredDuration-duration);
@@ -557,6 +602,18 @@ public long getRemainingLength() {
557602
return maxLength - position;
558603
}
559604

605+
/**
606+
* Forget about anything past the point where the content-body starts. This
607+
* is needed to support FetchHTTP's shouldFetchBody setting. See also the
608+
* docs on {@link #lastTwoBytes}
609+
*/
610+
public void chopAtMessageBodyBegin() {
611+
if (messageBodyBeginMark >= 0) {
612+
this.size = messageBodyBeginMark;
613+
this.position = messageBodyBeginMark;
614+
}
615+
}
616+
560617
public void clearForReuse() throws IOException {
561618
this.out = null;
562619
this.position = 0;

src/main/java/org/archive/io/ReplayInputStream.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -192,11 +192,15 @@ public int read(byte[] b, int off, int len) throws IOException {
192192
}
193193

194194
public void readFullyTo(OutputStream os) throws IOException {
195+
readFullyTo(this, os);
196+
}
197+
198+
public static void readFullyTo(InputStream in, OutputStream os) throws IOException {
195199
byte[] buf = new byte[4096];
196-
int c = read(buf);
200+
int c = in.read(buf);
197201
while (c != -1) {
198202
os.write(buf,0,c);
199-
c = read(buf);
203+
c = in.read(buf);
200204
}
201205
}
202206

@@ -218,12 +222,7 @@ public void readHeaderTo(OutputStream os) throws IOException {
218222
*/
219223
public void readContentTo(OutputStream os) throws IOException {
220224
setToResponseBodyStart();
221-
byte[] buf = new byte[4096];
222-
int c = read(buf);
223-
while (c != -1) {
224-
os.write(buf,0,c);
225-
c = read(buf);
226-
}
225+
readFullyTo(os);
227226
}
228227

229228
/**

0 commit comments

Comments
 (0)