Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
0963f2e
fix HER-2059 with commons-httpclient workaround
nlevitt Dec 7, 2013
3a7093a
Update version to 1.1.0 release!
ikreymer Dec 18, 2013
7ff3d4d
Merge branch 'master' of github.com:internetarchive/ia-web-commons
ikreymer Dec 18, 2013
acec0d4
updates for heritrix FetchHTTP using httpcomponents 4.3
nlevitt Jan 11, 2014
7c6c673
fix off-by-one mistake with integer arithmetic so RecordingInputStrea…
nlevitt Jan 11, 2014
8e33ab7
Merge pull request #4 from nlevitt/hc43
kngenie Jan 14, 2014
dcb68a2
get rid of RecyclingFastBufferedOutputStream, which was supposed to a…
nlevitt Jan 25, 2014
b0bfd40
Merge pull request #6 from nlevitt/no-recycling
eldondev Jan 25, 2014
77df672
update org.json dependency, because source jar is available in maven …
nlevitt Feb 1, 2014
338db0c
Merge pull request #7 from nlevitt/umbra
ikreymer Feb 5, 2014
7c15265
set version for final build as ia-web-commons
nlevitt Feb 21, 2014
1d1cbb8
Merge pull request #9 from nlevitt/ia-web-commons-sunset
ikreymer Feb 21, 2014
5e71d11
merge from iipc/master
nlevitt Feb 21, 2014
8acf4f8
Merge pull request #10 from nlevitt/merge-iipc-into-ia
ikreymer Feb 28, 2014
b82f488
remove sonatype parent, add distributionManagement back in
ikreymer Feb 28, 2014
ee31051
add missing newline back
ikreymer Feb 28, 2014
ffb68b8
pom.xml customizations to allow IA and other 3rd parties to tag build…
nlevitt Feb 28, 2014
8b4833c
Merge pull request #11 from nlevitt/unified-pom
ikreymer Feb 28, 2014
d81936c
configure properties so sonatype repositories are defaults for distri…
nlevitt Feb 28, 2014
3d28897
Merge pull request #12 from nlevitt/unified-pom
ikreymer Feb 28, 2014
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@
<version>7</version>
</parent>


<groupId>org.netpreserve.commons</groupId>
<artifactId>webarchive-commons</artifactId>
<version>1.1.1-SNAPSHOT</version>
<version>1.1.1-${build.tag}SNAPSHOT</version>
<packaging>jar</packaging>

<name>webarchive-commons</name>
Expand Down Expand Up @@ -54,6 +53,10 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<build.time>${maven.build.timestamp}</build.time>
<maven.build.timestamp.format>yyyyMMddhhmmss</maven.build.timestamp.format>
<build.tag></build.tag>
<!-- sonatype repositories are defaults for distributionManagement -->
<repository.url>https://oss.sonatype.org/service/local/staging/deploy/maven2/</repository.url>
<snapshotRepository.url>https://oss.sonatype.org/content/repositories/snapshots/</snapshotRepository.url>
</properties>

<dependencies>
Expand All @@ -73,7 +76,7 @@
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20090211</version>
<version>20131018</version>
</dependency>
<dependency>
<groupId>org.htmlparser</groupId>
Expand Down Expand Up @@ -240,4 +243,15 @@

</repositories>

<distributionManagement>
<repository>
<id>repository</id>
<url>${repository.url}</url>
</repository>
<snapshotRepository>
<id>snapshotRepository</id>
<url>${snapshotRepository.url}</url>
</snapshotRepository>
</distributionManagement>

</project>
12 changes: 12 additions & 0 deletions src/main/java/org/archive/httpclient/HttpRecorderGetMethod.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,16 @@ protected void addProxyConnectionHeader(HttpState state, HttpConnection conn)
super.addProxyConnectionHeader(state, conn);
this.httpRecorderMethod.handleAddProxyConnectionHeader(this);
}

// XXX see https://webarchive.jira.com/browse/HER-2059
// We never call this method with the implied question mark prepended, so
// adding it does the trick, since commons-httpclient will strip it later.
public void setQueryString(String queryString) {
if (queryString != null) {
super.setQueryString('?' + queryString);
} else {
super.setQueryString(queryString);
}
}

}
88 changes: 77 additions & 11 deletions src/main/java/org/archive/io/RecordingInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@ public RecordingInputStream(int bufferSize, String backingFilename)
}

public void open(InputStream wrappedStream) throws IOException {
logger.fine(Thread.currentThread().getName() + " opening " +
wrappedStream + ", " + Thread.currentThread().getName());
if (logger.isLoggable(Level.FINE)) {
logger.fine("wrapping " + wrappedStream + " in thread "
+ Thread.currentThread().getName());
}
if(isOpen()) {
// error; should not be opening/wrapping in an unclosed
// stream remains open
Expand Down Expand Up @@ -135,11 +137,11 @@ public int read(byte[] b) throws IOException {

public void close() throws IOException {
if (logger.isLoggable(Level.FINE)) {
logger.fine(Thread.currentThread().getName() + " closing " +
this.in + ", " + Thread.currentThread().getName());
logger.fine("closing " + this.in + " in thread "
+ Thread.currentThread().getName());
}
IOUtils.closeQuietly(this.in);
this.in = null;
this.in = null;
IOUtils.closeQuietly(this.recordingOutputStream);
}

Expand All @@ -159,20 +161,77 @@ public long readFully() throws IOException {
return this.recordingOutputStream.getSize();
}

public void readToEndOfContent(long contentLength)
throws IOException, InterruptedException {
// Check we're open before proceeding.
if (!isOpen()) {
// TODO: should this be a noisier exception-raising error?
return;
}

long totalBytes = recordingOutputStream.position - recordingOutputStream.getMessageBodyBegin();
long bytesRead = -1L;
long maxToRead = -1;
while (contentLength <= 0 || totalBytes < contentLength) {
try {
// read no more than soft max
maxToRead = (contentLength <= 0)
? drainBuffer.length
: Math.min(drainBuffer.length, contentLength - totalBytes);
// nor more than hard max
maxToRead = Math.min(maxToRead, recordingOutputStream.getRemainingLength());
// but always at least 1 (to trigger hard max exception) XXX wtf is this?
maxToRead = Math.max(maxToRead, 1);

bytesRead = read(drainBuffer,0,(int)maxToRead);
if (bytesRead == -1) {
break;
}
totalBytes += bytesRead;

if (Thread.interrupted()) {
throw new InterruptedException("Interrupted during IO");
}
} catch (SocketTimeoutException e) {
// A socket timeout is just a transient problem, meaning
// nothing was available in the configured timeout period,
// but something else might become available later.
// Take this opportunity to check the overall
// timeout (below). One reason for this timeout is
// servers that keep up the connection, 'keep-alive', even
// though we asked them to not keep the connection open.
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "socket timeout", e);
}
// check for interrupt
if (Thread.interrupted()) {
throw new InterruptedException("Interrupted during IO");
}
// check for overall timeout
recordingOutputStream.checkLimits();
} catch (SocketException se) {
throw se;
} catch (NullPointerException e) {
// [ 896757 ] NPEs in Andy's Th-Fri Crawl.
// A crawl was showing NPE's in this part of the code but can
// not reproduce. Adding this rethrowing catch block w/
// diagnostics to help should we come across the problem in the
// future.
throw new NullPointerException("Stream " + this.in + ", " +
e.getMessage() + " " + Thread.currentThread().getName());
}
}
}

/**
* Read all of a stream (Or read until we timeout or have read to the max).
* @param softMaxLength Maximum length to read; if zero or < 0, then no
* limit. If met, return normally.
* @param hardMaxLength Maximum length to read; if zero or < 0, then no
* limit. If exceeded, throw RecorderLengthExceededException
* @param timeout Timeout in milliseconds for total read; if zero or
* negative, timeout is <code>Long.MAX_VALUE</code>. If exceeded, throw
* RecorderTimeoutException
* @param maxBytesPerMs How many bytes per millisecond.
* @throws IOException failed read.
* @throws RecorderLengthExceededException
* @throws RecorderTimeoutException
* @throws InterruptedException
* @deprecated
*/
public void readFullyOrUntil(long softMaxLength)
throws IOException, RecorderLengthExceededException,
Expand Down Expand Up @@ -349,6 +408,13 @@ public int getRecordedBufferLength() {
return recordingOutputStream.getBufferLength();
}

/**
* See doc on {@link RecordingOutputStream#chopAtMessageBodyBegin()}
*/
public void chopAtMessageBodyBegin() {
recordingOutputStream.chopAtMessageBodyBegin();
}

public void clearForReuse() throws IOException {
recordingOutputStream.clearForReuse();
}
Expand Down
91 changes: 69 additions & 22 deletions src/main/java/org/archive/io/RecordingOutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import it.unimi.dsi.fastutil.io.FastBufferedOutputStream;

import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
Expand Down Expand Up @@ -84,17 +85,11 @@ public class RecordingOutputStream extends OutputStream {
private byte[] buffer;

/** current virtual position in the recording */
private long position;
long position;

/** flag to disable recording */
private boolean recording;

/**
* Reusable buffer for FastBufferedOutputStream
*/
protected byte[] bufStreamBuf =
new byte [ FastBufferedOutputStream.DEFAULT_BUFFER_SIZE ];

/**
* True if we're to digest content.
*/
Expand Down Expand Up @@ -132,6 +127,29 @@ public class RecordingOutputStream extends OutputStream {
*/
protected long messageBodyBeginMark;

/**
* While messageBodyBeginMark is not set, the last two bytes seen.
*
* <p>
* This class does automatic detection of http message body begin (i.e. end
* of http headers). Unfortunately httpcomponents did not want to add
* functionality to help us with this, see
* https://issues.apache.org/jira/browse/HTTPCORE-325
*
* <p>
* It works like this: while messageBodyBeginMark is not set, we remember
* the last two bytes seen, and look at each byte we write. If the
* lastTwoBytes+currentByte is "\n\r\n", or lastTwoBytes[1]+currentByte is
* "\n\n" then we call markMessageBodyBegin() at the position after
* currentByte.
*
* <p>
* An assumption here is that protocols other than http don't have headers,
* and for those protocols the user of this class will call
* markMessageBodyBegin() at position 0 before writing anything.
*/
protected int[] lastTwoBytes = new int[] {-1, -1};

/**
* Stream to record.
*/
Expand Down Expand Up @@ -183,15 +201,18 @@ public void open(OutputStream wrappedStream) throws IOException {
}
clearForReuse();
this.out = wrappedStream;
startTime = System.currentTimeMillis();
}

protected OutputStream ensureDiskStream() throws FileNotFoundException {
if (this.diskStream == null) {
// TODO: Fix so we only make file when its actually needed.
FileOutputStream fis = new FileOutputStream(this.backingFilename);

this.diskStream = new RecyclingFastBufferedOutputStream(fis, bufStreamBuf);
this.diskStream = new FastBufferedOutputStream(fis);
}
startTime = System.currentTimeMillis();
return this.diskStream;
}


public void write(int b) throws IOException {
if(position<maxPosition) {
// revisiting previous content; do nothing but advance position
Expand All @@ -204,6 +225,20 @@ public void write(int b) throws IOException {
if (this.out != null) {
this.out.write(b);
}

// see comment on int[] lastTwoBytes
if (messageBodyBeginMark < 0l) {
// looking for "\n\n" or "\n\r\n"
if (b == '\n'
&& (lastTwoBytes[1] == '\n'
|| (lastTwoBytes[0] == '\n' && lastTwoBytes[1] == '\r'))) {
markMessageBodyBegin();
} else {
lastTwoBytes[0] = lastTwoBytes[1];
lastTwoBytes[1] = b;
}
}

checkLimits();
}

Expand All @@ -220,6 +255,14 @@ public void write(byte[] b, int off, int len) throws IOException {
off += consumeRange;
len -= consumeRange;
}

// see comment on int[] lastTwoBytes
while (messageBodyBeginMark < 0 && len > 0) {
write(b[off]);
off++;
len--;
}

if(recording) {
record(b, off, len);
}
Expand Down Expand Up @@ -251,7 +294,7 @@ protected void checkLimits() throws RecorderIOException {
throw new RecorderTimeoutException();
}
// need to throttle reading to hit max configured rate?
if(position/duration > maxRateBytesPerMs) {
if(position/duration >= maxRateBytesPerMs) {
long desiredDuration = position / maxRateBytesPerMs;
try {
Thread.sleep(desiredDuration-duration);
Expand All @@ -274,10 +317,7 @@ private void record(int b) throws IOException {
this.digest.update((byte)b);
}
if (this.position >= this.buffer.length) {
// TODO: Its possible to call write w/o having first opened a
// stream. Protect ourselves against this.
assert this.diskStream != null: "Diskstream is null";
this.diskStream.write(b);
this.ensureDiskStream().write(b);
} else {
this.buffer[(int) this.position] = (byte) b;
}
Expand Down Expand Up @@ -312,12 +352,7 @@ private void record(byte[] b, int off, int len) throws IOException {
*/
private void tailRecord(byte[] b, int off, int len) throws IOException {
if(this.position >= this.buffer.length){
// TODO: Its possible to call write w/o having first opened a
// stream. Lets protect ourselves against this.
if (this.diskStream == null) {
throw new IOException("diskstream is null");
}
this.diskStream.write(b, off, len);
this.ensureDiskStream().write(b, off, len);
this.position += len;
} else {
assert this.buffer != null: "Buffer is null";
Expand Down Expand Up @@ -557,6 +592,18 @@ public long getRemainingLength() {
return maxLength - position;
}

/**
* Forget about anything past the point where the content-body starts. This
* is needed to support FetchHTTP's shouldFetchBody setting. See also the
* docs on {@link #lastTwoBytes}
*/
public void chopAtMessageBodyBegin() {
if (messageBodyBeginMark >= 0) {
this.size = messageBodyBeginMark;
this.position = messageBodyBeginMark;
}
}

public void clearForReuse() throws IOException {
this.out = null;
this.position = 0;
Expand Down

This file was deleted.

Loading