diff --git a/pom.xml b/pom.xml index cfd201b0..9c8698c7 100644 --- a/pom.xml +++ b/pom.xml @@ -7,10 +7,9 @@ 7 - org.netpreserve.commons webarchive-commons - 1.1.1-SNAPSHOT + 1.1.1-${build.tag}SNAPSHOT jar webarchive-commons @@ -54,6 +53,10 @@ UTF-8 ${maven.build.timestamp} yyyyMMddhhmmss + + + https://oss.sonatype.org/service/local/staging/deploy/maven2/ + https://oss.sonatype.org/content/repositories/snapshots/ @@ -73,7 +76,7 @@ org.json json - 20090211 + 20131018 org.htmlparser @@ -240,4 +243,15 @@ + + + repository + ${repository.url} + + + snapshotRepository + ${snapshotRepository.url} + + + diff --git a/src/main/java/org/archive/httpclient/HttpRecorderGetMethod.java b/src/main/java/org/archive/httpclient/HttpRecorderGetMethod.java index 105c4f7e..ef241b48 100644 --- a/src/main/java/org/archive/httpclient/HttpRecorderGetMethod.java +++ b/src/main/java/org/archive/httpclient/HttpRecorderGetMethod.java @@ -117,4 +117,16 @@ protected void addProxyConnectionHeader(HttpState state, HttpConnection conn) super.addProxyConnectionHeader(state, conn); this.httpRecorderMethod.handleAddProxyConnectionHeader(this); } + + // XXX see https://webarchive.jira.com/browse/HER-2059 + // We never call this method with the implied question mark prepended, so + // adding it does the trick, since commons-httpclient will strip it later. + public void setQueryString(String queryString) { + if (queryString != null) { + super.setQueryString('?' + queryString); + } else { + super.setQueryString(queryString); + } + } + } diff --git a/src/main/java/org/archive/io/RecordingInputStream.java b/src/main/java/org/archive/io/RecordingInputStream.java index b46905ed..36f539a9 100644 --- a/src/main/java/org/archive/io/RecordingInputStream.java +++ b/src/main/java/org/archive/io/RecordingInputStream.java @@ -74,8 +74,10 @@ public RecordingInputStream(int bufferSize, String backingFilename) } public void open(InputStream wrappedStream) throws IOException { - logger.fine(Thread.currentThread().getName() + " opening " + - wrappedStream + ", " + Thread.currentThread().getName()); + if (logger.isLoggable(Level.FINE)) { + logger.fine("wrapping " + wrappedStream + " in thread " + + Thread.currentThread().getName()); + } if(isOpen()) { // error; should not be opening/wrapping in an unclosed // stream remains open @@ -135,11 +137,11 @@ public int read(byte[] b) throws IOException { public void close() throws IOException { if (logger.isLoggable(Level.FINE)) { - logger.fine(Thread.currentThread().getName() + " closing " + - this.in + ", " + Thread.currentThread().getName()); + logger.fine("closing " + this.in + " in thread " + + Thread.currentThread().getName()); } IOUtils.closeQuietly(this.in); - this.in = null; + this.in = null; IOUtils.closeQuietly(this.recordingOutputStream); } @@ -159,20 +161,77 @@ public long readFully() throws IOException { return this.recordingOutputStream.getSize(); } + public void readToEndOfContent(long contentLength) + throws IOException, InterruptedException { + // Check we're open before proceeding. + if (!isOpen()) { + // TODO: should this be a noisier exception-raising error? + return; + } + + long totalBytes = recordingOutputStream.position - recordingOutputStream.getMessageBodyBegin(); + long bytesRead = -1L; + long maxToRead = -1; + while (contentLength <= 0 || totalBytes < contentLength) { + try { + // read no more than soft max + maxToRead = (contentLength <= 0) + ? drainBuffer.length + : Math.min(drainBuffer.length, contentLength - totalBytes); + // nor more than hard max + maxToRead = Math.min(maxToRead, recordingOutputStream.getRemainingLength()); + // but always at least 1 (to trigger hard max exception) XXX wtf is this? + maxToRead = Math.max(maxToRead, 1); + + bytesRead = read(drainBuffer,0,(int)maxToRead); + if (bytesRead == -1) { + break; + } + totalBytes += bytesRead; + + if (Thread.interrupted()) { + throw new InterruptedException("Interrupted during IO"); + } + } catch (SocketTimeoutException e) { + // A socket timeout is just a transient problem, meaning + // nothing was available in the configured timeout period, + // but something else might become available later. + // Take this opportunity to check the overall + // timeout (below). One reason for this timeout is + // servers that keep up the connection, 'keep-alive', even + // though we asked them to not keep the connection open. + if (logger.isLoggable(Level.FINE)) { + logger.log(Level.FINE, "socket timeout", e); + } + // check for interrupt + if (Thread.interrupted()) { + throw new InterruptedException("Interrupted during IO"); + } + // check for overall timeout + recordingOutputStream.checkLimits(); + } catch (SocketException se) { + throw se; + } catch (NullPointerException e) { + // [ 896757 ] NPEs in Andy's Th-Fri Crawl. + // A crawl was showing NPE's in this part of the code but can + // not reproduce. Adding this rethrowing catch block w/ + // diagnostics to help should we come across the problem in the + // future. + throw new NullPointerException("Stream " + this.in + ", " + + e.getMessage() + " " + Thread.currentThread().getName()); + } + } + } + /** * Read all of a stream (Or read until we timeout or have read to the max). * @param softMaxLength Maximum length to read; if zero or < 0, then no * limit. If met, return normally. - * @param hardMaxLength Maximum length to read; if zero or < 0, then no - * limit. If exceeded, throw RecorderLengthExceededException - * @param timeout Timeout in milliseconds for total read; if zero or - * negative, timeout is Long.MAX_VALUE. If exceeded, throw - * RecorderTimeoutException - * @param maxBytesPerMs How many bytes per millisecond. * @throws IOException failed read. * @throws RecorderLengthExceededException * @throws RecorderTimeoutException * @throws InterruptedException + * @deprecated */ public void readFullyOrUntil(long softMaxLength) throws IOException, RecorderLengthExceededException, @@ -349,6 +408,13 @@ public int getRecordedBufferLength() { return recordingOutputStream.getBufferLength(); } + /** + * See doc on {@link RecordingOutputStream#chopAtMessageBodyBegin()} + */ + public void chopAtMessageBodyBegin() { + recordingOutputStream.chopAtMessageBodyBegin(); + } + public void clearForReuse() throws IOException { recordingOutputStream.clearForReuse(); } diff --git a/src/main/java/org/archive/io/RecordingOutputStream.java b/src/main/java/org/archive/io/RecordingOutputStream.java index 4d0713da..fe05701c 100644 --- a/src/main/java/org/archive/io/RecordingOutputStream.java +++ b/src/main/java/org/archive/io/RecordingOutputStream.java @@ -21,6 +21,7 @@ import it.unimi.dsi.fastutil.io.FastBufferedOutputStream; +import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -84,17 +85,11 @@ public class RecordingOutputStream extends OutputStream { private byte[] buffer; /** current virtual position in the recording */ - private long position; + long position; /** flag to disable recording */ private boolean recording; - /** - * Reusable buffer for FastBufferedOutputStream - */ - protected byte[] bufStreamBuf = - new byte [ FastBufferedOutputStream.DEFAULT_BUFFER_SIZE ]; - /** * True if we're to digest content. */ @@ -132,6 +127,29 @@ public class RecordingOutputStream extends OutputStream { */ protected long messageBodyBeginMark; + /** + * While messageBodyBeginMark is not set, the last two bytes seen. + * + *

+ * This class does automatic detection of http message body begin (i.e. end + * of http headers). Unfortunately httpcomponents did not want to add + * functionality to help us with this, see + * https://issues.apache.org/jira/browse/HTTPCORE-325 + * + *

+ * It works like this: while messageBodyBeginMark is not set, we remember + * the last two bytes seen, and look at each byte we write. If the + * lastTwoBytes+currentByte is "\n\r\n", or lastTwoBytes[1]+currentByte is + * "\n\n" then we call markMessageBodyBegin() at the position after + * currentByte. + * + *

+ * An assumption here is that protocols other than http don't have headers, + * and for those protocols the user of this class will call + * markMessageBodyBegin() at position 0 before writing anything. + */ + protected int[] lastTwoBytes = new int[] {-1, -1}; + /** * Stream to record. */ @@ -183,15 +201,18 @@ public void open(OutputStream wrappedStream) throws IOException { } clearForReuse(); this.out = wrappedStream; + startTime = System.currentTimeMillis(); + } + + protected OutputStream ensureDiskStream() throws FileNotFoundException { if (this.diskStream == null) { - // TODO: Fix so we only make file when its actually needed. FileOutputStream fis = new FileOutputStream(this.backingFilename); - - this.diskStream = new RecyclingFastBufferedOutputStream(fis, bufStreamBuf); + this.diskStream = new FastBufferedOutputStream(fis); } - startTime = System.currentTimeMillis(); + return this.diskStream; } + public void write(int b) throws IOException { if(position 0) { + write(b[off]); + off++; + len--; + } + if(recording) { record(b, off, len); } @@ -251,7 +294,7 @@ protected void checkLimits() throws RecorderIOException { throw new RecorderTimeoutException(); } // need to throttle reading to hit max configured rate? - if(position/duration > maxRateBytesPerMs) { + if(position/duration >= maxRateBytesPerMs) { long desiredDuration = position / maxRateBytesPerMs; try { Thread.sleep(desiredDuration-duration); @@ -274,10 +317,7 @@ private void record(int b) throws IOException { this.digest.update((byte)b); } if (this.position >= this.buffer.length) { - // TODO: Its possible to call write w/o having first opened a - // stream. Protect ourselves against this. - assert this.diskStream != null: "Diskstream is null"; - this.diskStream.write(b); + this.ensureDiskStream().write(b); } else { this.buffer[(int) this.position] = (byte) b; } @@ -312,12 +352,7 @@ private void record(byte[] b, int off, int len) throws IOException { */ private void tailRecord(byte[] b, int off, int len) throws IOException { if(this.position >= this.buffer.length){ - // TODO: Its possible to call write w/o having first opened a - // stream. Lets protect ourselves against this. - if (this.diskStream == null) { - throw new IOException("diskstream is null"); - } - this.diskStream.write(b, off, len); + this.ensureDiskStream().write(b, off, len); this.position += len; } else { assert this.buffer != null: "Buffer is null"; @@ -557,6 +592,18 @@ public long getRemainingLength() { return maxLength - position; } + /** + * Forget about anything past the point where the content-body starts. This + * is needed to support FetchHTTP's shouldFetchBody setting. See also the + * docs on {@link #lastTwoBytes} + */ + public void chopAtMessageBodyBegin() { + if (messageBodyBeginMark >= 0) { + this.size = messageBodyBeginMark; + this.position = messageBodyBeginMark; + } + } + public void clearForReuse() throws IOException { this.out = null; this.position = 0; diff --git a/src/main/java/org/archive/io/RecyclingFastBufferedOutputStream.java b/src/main/java/org/archive/io/RecyclingFastBufferedOutputStream.java deleted file mode 100644 index a3b76e46..00000000 --- a/src/main/java/org/archive/io/RecyclingFastBufferedOutputStream.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * This file is part of the Heritrix web crawler (crawler.archive.org). - * - * Licensed to the Internet Archive (IA) by one or more individual - * contributors. - * - * The IA licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.archive.io; - -import it.unimi.dsi.fastutil.io.FastBufferedOutputStream; - -import java.io.OutputStream; - -/** - * FastBufferedOutputStream that accepts a passed-in buffer (avoiding - * reallocation). - */ -public class RecyclingFastBufferedOutputStream extends FastBufferedOutputStream { - public RecyclingFastBufferedOutputStream( final OutputStream os, final byte[] buffer ) { - super(os); - this.buffer = buffer; - avail = buffer.length; - } -} - - diff --git a/src/main/java/org/archive/io/ReplayInputStream.java b/src/main/java/org/archive/io/ReplayInputStream.java index fccf5fd3..35ea8175 100644 --- a/src/main/java/org/archive/io/ReplayInputStream.java +++ b/src/main/java/org/archive/io/ReplayInputStream.java @@ -192,11 +192,15 @@ public int read(byte[] b, int off, int len) throws IOException { } public void readFullyTo(OutputStream os) throws IOException { + readFullyTo(this, os); + } + + public static void readFullyTo(InputStream in, OutputStream os) throws IOException { byte[] buf = new byte[4096]; - int c = read(buf); + int c = in.read(buf); while (c != -1) { os.write(buf,0,c); - c = read(buf); + c = in.read(buf); } } @@ -218,12 +222,7 @@ public void readHeaderTo(OutputStream os) throws IOException { */ public void readContentTo(OutputStream os) throws IOException { setToResponseBodyStart(); - byte[] buf = new byte[4096]; - int c = read(buf); - while (c != -1) { - os.write(buf,0,c); - c = read(buf); - } + readFullyTo(os); } /** diff --git a/src/main/java/org/archive/io/WriterPoolMember.java b/src/main/java/org/archive/io/WriterPoolMember.java index 6ea6b295..85d44e5d 100644 --- a/src/main/java/org/archive/io/WriterPoolMember.java +++ b/src/main/java/org/archive/io/WriterPoolMember.java @@ -19,6 +19,8 @@ package org.archive.io; +import it.unimi.dsi.fastutil.io.FastBufferedOutputStream; + import java.io.File; import java.io.FileOutputStream; import java.io.IOException; @@ -80,9 +82,6 @@ public abstract class WriterPoolMember implements ArchiveFileConstants { /** Counting stream for metering */ protected MiserOutputStream countOut = null; - /** reusable buffer for recycling scenarios */ - protected byte[] rebuf; - protected WriterPoolSettings settings; private final String extension; @@ -209,10 +208,7 @@ protected String createFile(final File file) throws IOException { close(); this.f = file; FileOutputStream fos = new FileOutputStream(this.f); - if(rebuf==null) { - rebuf = new byte[settings.getWriteBufferSize()]; - } - this.countOut = new MiserOutputStream(new RecyclingFastBufferedOutputStream(fos,rebuf),settings.getFrequentFlushes()); + this.countOut = new MiserOutputStream(new FastBufferedOutputStream(fos),settings.getFrequentFlushes()); this.out = this.countOut; logger.fine("Opened " + this.f.getAbsolutePath()); return this.f.getName();