Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>org.archive</groupId>
<artifactId>ia-web-commons</artifactId>
<version>1.1.0</version>
<version>1.1.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>ia-web-commons</name>
Expand Down
88 changes: 77 additions & 11 deletions src/main/java/org/archive/io/RecordingInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@ public RecordingInputStream(int bufferSize, String backingFilename)
}

public void open(InputStream wrappedStream) throws IOException {
logger.fine(Thread.currentThread().getName() + " opening " +
wrappedStream + ", " + Thread.currentThread().getName());
if (logger.isLoggable(Level.FINE)) {
logger.fine("wrapping " + wrappedStream + " in thread "
+ Thread.currentThread().getName());
}
if(isOpen()) {
// error; should not be opening/wrapping in an unclosed
// stream remains open
Expand Down Expand Up @@ -135,11 +137,11 @@ public int read(byte[] b) throws IOException {

public void close() throws IOException {
if (logger.isLoggable(Level.FINE)) {
logger.fine(Thread.currentThread().getName() + " closing " +
this.in + ", " + Thread.currentThread().getName());
logger.fine("closing " + this.in + " in thread "
+ Thread.currentThread().getName());
}
IOUtils.closeQuietly(this.in);
this.in = null;
this.in = null;
IOUtils.closeQuietly(this.recordingOutputStream);
}

Expand All @@ -159,20 +161,77 @@ public long readFully() throws IOException {
return this.recordingOutputStream.getSize();
}

public void readToEndOfContent(long contentLength)
throws IOException, InterruptedException {
// Check we're open before proceeding.
if (!isOpen()) {
// TODO: should this be a noisier exception-raising error?
return;
}

long totalBytes = recordingOutputStream.position - recordingOutputStream.getMessageBodyBegin();
long bytesRead = -1L;
long maxToRead = -1;
while (contentLength <= 0 || totalBytes < contentLength) {
try {
// read no more than soft max
maxToRead = (contentLength <= 0)
? drainBuffer.length
: Math.min(drainBuffer.length, contentLength - totalBytes);
// nor more than hard max
maxToRead = Math.min(maxToRead, recordingOutputStream.getRemainingLength());
// but always at least 1 (to trigger hard max exception) XXX wtf is this?
maxToRead = Math.max(maxToRead, 1);

bytesRead = read(drainBuffer,0,(int)maxToRead);
if (bytesRead == -1) {
break;
}
totalBytes += bytesRead;

if (Thread.interrupted()) {
throw new InterruptedException("Interrupted during IO");
}
} catch (SocketTimeoutException e) {
// A socket timeout is just a transient problem, meaning
// nothing was available in the configured timeout period,
// but something else might become available later.
// Take this opportunity to check the overall
// timeout (below). One reason for this timeout is
// servers that keep up the connection, 'keep-alive', even
// though we asked them to not keep the connection open.
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "socket timeout", e);
}
// check for interrupt
if (Thread.interrupted()) {
throw new InterruptedException("Interrupted during IO");
}
// check for overall timeout
recordingOutputStream.checkLimits();
} catch (SocketException se) {
throw se;
} catch (NullPointerException e) {
// [ 896757 ] NPEs in Andy's Th-Fri Crawl.
// A crawl was showing NPE's in this part of the code but can
// not reproduce. Adding this rethrowing catch block w/
// diagnostics to help should we come across the problem in the
// future.
throw new NullPointerException("Stream " + this.in + ", " +
e.getMessage() + " " + Thread.currentThread().getName());
}
}
}

/**
* Read all of a stream (Or read until we timeout or have read to the max).
* @param softMaxLength Maximum length to read; if zero or < 0, then no
* limit. If met, return normally.
* @param hardMaxLength Maximum length to read; if zero or < 0, then no
* limit. If exceeded, throw RecorderLengthExceededException
* @param timeout Timeout in milliseconds for total read; if zero or
* negative, timeout is <code>Long.MAX_VALUE</code>. If exceeded, throw
* RecorderTimeoutException
* @param maxBytesPerMs How many bytes per millisecond.
* @throws IOException failed read.
* @throws RecorderLengthExceededException
* @throws RecorderTimeoutException
* @throws InterruptedException
* @deprecated
*/
public void readFullyOrUntil(long softMaxLength)
throws IOException, RecorderLengthExceededException,
Expand Down Expand Up @@ -349,6 +408,13 @@ public int getRecordedBufferLength() {
return recordingOutputStream.getBufferLength();
}

/**
* See doc on {@link RecordingOutputStream#chopAtMessageBodyBegin()}
*/
public void chopAtMessageBodyBegin() {
recordingOutputStream.chopAtMessageBodyBegin();
}

public void clearForReuse() throws IOException {
recordingOutputStream.clearForReuse();
}
Expand Down
61 changes: 59 additions & 2 deletions src/main/java/org/archive/io/RecordingOutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public class RecordingOutputStream extends OutputStream {
private byte[] buffer;

/** current virtual position in the recording */
private long position;
long position;

/** flag to disable recording */
private boolean recording;
Expand Down Expand Up @@ -132,6 +132,29 @@ public class RecordingOutputStream extends OutputStream {
*/
protected long messageBodyBeginMark;

/**
* While messageBodyBeginMark is not set, the last two bytes seen.
*
* <p>
* This class does automatic detection of http message body begin (i.e. end
* of http headers). Unfortunately httpcomponents did not want to add
* functionality to help us with this, see
* https://issues.apache.org/jira/browse/HTTPCORE-325
*
* <p>
* It works like this: while messageBodyBeginMark is not set, we remember
* the last two bytes seen, and look at each byte we write. If the
* lastTwoBytes+currentByte is "\n\r\n", or lastTwoBytes[1]+currentByte is
* "\n\n" then we call markMessageBodyBegin() at the position after
* currentByte.
*
* <p>
* An assumption here is that protocols other than http don't have headers,
* and for those protocols the user of this class will call
* markMessageBodyBegin() at position 0 before writing anything.
*/
protected int[] lastTwoBytes = new int[] {-1, -1};

/**
* Stream to record.
*/
Expand Down Expand Up @@ -204,6 +227,20 @@ public void write(int b) throws IOException {
if (this.out != null) {
this.out.write(b);
}

// see comment on int[] lastTwoBytes
if (messageBodyBeginMark < 0l) {
// looking for "\n\n" or "\n\r\n"
if (b == '\n'
&& (lastTwoBytes[1] == '\n'
|| (lastTwoBytes[0] == '\n' && lastTwoBytes[1] == '\r'))) {
markMessageBodyBegin();
} else {
lastTwoBytes[0] = lastTwoBytes[1];
lastTwoBytes[1] = b;
}
}

checkLimits();
}

Expand All @@ -220,6 +257,14 @@ public void write(byte[] b, int off, int len) throws IOException {
off += consumeRange;
len -= consumeRange;
}

// see comment on int[] lastTwoBytes
while (messageBodyBeginMark < 0 && len > 0) {
write(b[off]);
off++;
len--;
}

if(recording) {
record(b, off, len);
}
Expand Down Expand Up @@ -251,7 +296,7 @@ protected void checkLimits() throws RecorderIOException {
throw new RecorderTimeoutException();
}
// need to throttle reading to hit max configured rate?
if(position/duration > maxRateBytesPerMs) {
if(position/duration >= maxRateBytesPerMs) {
long desiredDuration = position / maxRateBytesPerMs;
try {
Thread.sleep(desiredDuration-duration);
Expand Down Expand Up @@ -557,6 +602,18 @@ public long getRemainingLength() {
return maxLength - position;
}

/**
* Forget about anything past the point where the content-body starts. This
* is needed to support FetchHTTP's shouldFetchBody setting. See also the
* docs on {@link #lastTwoBytes}
*/
public void chopAtMessageBodyBegin() {
if (messageBodyBeginMark >= 0) {
this.size = messageBodyBeginMark;
this.position = messageBodyBeginMark;
}
}

public void clearForReuse() throws IOException {
this.out = null;
this.position = 0;
Expand Down
15 changes: 7 additions & 8 deletions src/main/java/org/archive/io/ReplayInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,15 @@ public int read(byte[] b, int off, int len) throws IOException {
}

public void readFullyTo(OutputStream os) throws IOException {
readFullyTo(this, os);
}

public static void readFullyTo(InputStream in, OutputStream os) throws IOException {
byte[] buf = new byte[4096];
int c = read(buf);
int c = in.read(buf);
while (c != -1) {
os.write(buf,0,c);
c = read(buf);
c = in.read(buf);
}
}

Expand All @@ -218,12 +222,7 @@ public void readHeaderTo(OutputStream os) throws IOException {
*/
public void readContentTo(OutputStream os) throws IOException {
setToResponseBodyStart();
byte[] buf = new byte[4096];
int c = read(buf);
while (c != -1) {
os.write(buf,0,c);
c = read(buf);
}
readFullyTo(os);
}

/**
Expand Down