diff --git a/src/changes/changes.xml b/src/changes/changes.xml index ecee3aeca01..3aa41940132 100644 --- a/src/changes/changes.xml +++ b/src/changes/changes.xml @@ -56,6 +56,7 @@ The type attribute can be add,update,fix,remove. IOUtils.toByteArray(InputStream) now throws IOException on byte array overflow. Javadoc general improvements. IOUtils.toByteArray() now throws EOFException when not enough data is available #796. + Fix IOUtils.skip() usage in concurrent scenarios. FileUtils#byteCountToDisplaySize() supports Zettabyte, Yottabyte, Ronnabyte and Quettabyte #763. Add org.apache.commons.io.FileUtils.ONE_RB #763. diff --git a/src/main/java/org/apache/commons/io/CopyUtils.java b/src/main/java/org/apache/commons/io/CopyUtils.java index e22adef83be..fdb137d2039 100644 --- a/src/main/java/org/apache/commons/io/CopyUtils.java +++ b/src/main/java/org/apache/commons/io/CopyUtils.java @@ -278,14 +278,18 @@ public static int copy( final Reader input, final Writer output) throws IOException { - final char[] buffer = IOUtils.getScratchCharArray(); - int count = 0; - int n; - while (EOF != (n = input.read(buffer))) { - output.write(buffer, 0, n); - count += n; + final char[] buffer = IOUtils.ScratchBufferHolder.getScratchCharArray(); + try { + int count = 0; + int n; + while (EOF != (n = input.read(buffer))) { + output.write(buffer, 0, n); + count += n; + } + return count; + } finally { + IOUtils.ScratchBufferHolder.releaseScratchCharArray(buffer); } - return count; } /** diff --git a/src/main/java/org/apache/commons/io/IOUtils.java b/src/main/java/org/apache/commons/io/IOUtils.java index a1f9b0709a2..717be4b49bf 100644 --- a/src/main/java/org/apache/commons/io/IOUtils.java +++ b/src/main/java/org/apache/commons/io/IOUtils.java @@ -132,6 +132,94 @@ public class IOUtils { // Writer. Each method should take at least one of these as a parameter, // or return one of them. + /** + * Holder for per-thread internal scratch buffers. + * + *

Buffers are created lazily and reused within the same thread to reduce allocation overhead. In the rare case of reentrant access, a temporary buffer + * is allocated to avoid data corruption.

+ * + *

Typical usage:

+ * + *
{@code
+     * final byte[] buffer = ScratchBufferHolder.getScratchByteArray();
+     * try {
+     *     // use the buffer
+     * } finally {
+     *     ScratchBufferHolder.releaseScratchByteArray(buffer);
+     * }
+     * }
+ */ + static final class ScratchBufferHolder { + + /** + * Holder for internal byte array buffer. + */ + private static final ThreadLocal SCRATCH_BYTE_BUFFER_HOLDER = ThreadLocal.withInitial(() -> new Object[] { false, byteArray() }); + + /** + * Holder for internal char array buffer. + */ + private static final ThreadLocal SCRATCH_CHAR_BUFFER_HOLDER = ThreadLocal.withInitial(() -> new Object[] { false, charArray() }); + + + /** + * Gets the internal byte array buffer. + * + * @return the internal byte array buffer. + */ + static byte[] getScratchByteArray() { + final Object[] holder = SCRATCH_BYTE_BUFFER_HOLDER.get(); + // If already in use, return a new array + if ((boolean) holder[0]) { + return byteArray(); + } + holder[0] = true; + return (byte[]) holder[1]; + } + + /** + * Gets the char array buffer. + * + * @return the char array buffer. + */ + static char[] getScratchCharArray() { + final Object[] holder = SCRATCH_CHAR_BUFFER_HOLDER.get(); + // If already in use, return a new array + if ((boolean) holder[0]) { + return charArray(); + } + holder[0] = true; + return (char[]) holder[1]; + } + + + /** + * If the argument is the internal byte array, release it for reuse. + * + * @param array the byte array to release. + */ + static void releaseScratchByteArray(byte[] array) { + final Object[] holder = SCRATCH_BYTE_BUFFER_HOLDER.get(); + if (array == holder[1]) { + Arrays.fill(array, (byte) 0); + holder[0] = false; + } + } + + /** + * If the argument is the internal char array, release it for reuse. + * + * @param array the char array to release. + */ + static void releaseScratchCharArray(char[] array) { + final Object[] holder = SCRATCH_CHAR_BUFFER_HOLDER.get(); + if (array == holder[1]) { + Arrays.fill(array, (char) 0); + holder[0] = false; + } + } + } + /** * CR char '{@value}'. * @@ -201,26 +289,6 @@ public class IOUtils { */ public static final String LINE_SEPARATOR_WINDOWS = StandardLineSeparator.CRLF.getString(); - /** - * Internal byte array buffer, intended for both reading and writing. - */ - private static final ThreadLocal SCRATCH_BYTE_BUFFER_RW = ThreadLocal.withInitial(IOUtils::byteArray); - - /** - * Internal byte array buffer, intended for write only operations. - */ - private static final byte[] SCRATCH_BYTE_BUFFER_WO = byteArray(); - - /** - * Internal char array buffer, intended for both reading and writing. - */ - private static final ThreadLocal SCRATCH_CHAR_BUFFER_RW = ThreadLocal.withInitial(IOUtils::charArray); - - /** - * Internal char array buffer, intended for write only operations. - */ - private static final char[] SCRATCH_CHAR_BUFFER_WO = charArray(); - /** * The maximum size of an array in many Java VMs. *

@@ -592,10 +660,8 @@ static void checkFromToIndex(final int fromIndex, final int toIndex, final int l * @see IO#clear() */ static void clear() { - SCRATCH_BYTE_BUFFER_RW.remove(); - SCRATCH_CHAR_BUFFER_RW.remove(); - Arrays.fill(SCRATCH_BYTE_BUFFER_WO, (byte) 0); - Arrays.fill(SCRATCH_CHAR_BUFFER_WO, (char) 0); + ScratchBufferHolder.SCRATCH_BYTE_BUFFER_HOLDER.remove(); + ScratchBufferHolder.SCRATCH_CHAR_BUFFER_HOLDER.remove(); } /** @@ -1139,39 +1205,43 @@ public static boolean contentEquals(final Reader input1, final Reader input2) th } // reuse one - final char[] array1 = getScratchCharArray(); + final char[] array1 = ScratchBufferHolder.getScratchCharArray(); // but allocate another final char[] array2 = charArray(); int pos1; int pos2; int count1; int count2; - while (true) { - pos1 = 0; - pos2 = 0; - for (int index = 0; index < DEFAULT_BUFFER_SIZE; index++) { - if (pos1 == index) { - do { - count1 = input1.read(array1, pos1, DEFAULT_BUFFER_SIZE - pos1); - } while (count1 == 0); - if (count1 == EOF) { - return pos2 == index && input2.read() == EOF; + try { + while (true) { + pos1 = 0; + pos2 = 0; + for (int index = 0; index < DEFAULT_BUFFER_SIZE; index++) { + if (pos1 == index) { + do { + count1 = input1.read(array1, pos1, DEFAULT_BUFFER_SIZE - pos1); + } while (count1 == 0); + if (count1 == EOF) { + return pos2 == index && input2.read() == EOF; + } + pos1 += count1; } - pos1 += count1; - } - if (pos2 == index) { - do { - count2 = input2.read(array2, pos2, DEFAULT_BUFFER_SIZE - pos2); - } while (count2 == 0); - if (count2 == EOF) { - return pos1 == index && input1.read() == EOF; + if (pos2 == index) { + do { + count2 = input2.read(array2, pos2, DEFAULT_BUFFER_SIZE - pos2); + } while (count2 == 0); + if (count2 == EOF) { + return pos1 == index && input1.read() == EOF; + } + pos2 += count2; + } + if (array1[index] != array2[index]) { + return false; } - pos2 += count2; - } - if (array1[index] != array2[index]) { - return false; } } + } finally { + ScratchBufferHolder.releaseScratchCharArray(array1); } } @@ -1651,9 +1721,13 @@ public static long copyLarge(final InputStream inputStream, final OutputStream o * @throws IOException if an I/O error occurs. * @since 2.2 */ - public static long copyLarge(final InputStream input, final OutputStream output, final long inputOffset, - final long length) throws IOException { - return copyLarge(input, output, inputOffset, length, getScratchByteArray()); + public static long copyLarge(final InputStream input, final OutputStream output, final long inputOffset, final long length) throws IOException { + final byte[] buffer = ScratchBufferHolder.getScratchByteArray(); + try { + return copyLarge(input, output, inputOffset, length, buffer); + } finally { + ScratchBufferHolder.releaseScratchByteArray(buffer); + } } /** @@ -1723,7 +1797,12 @@ public static long copyLarge(final InputStream input, final OutputStream output, * @since 1.3 */ public static long copyLarge(final Reader reader, final Writer writer) throws IOException { - return copyLarge(reader, writer, getScratchCharArray()); + final char[] buffer = ScratchBufferHolder.getScratchCharArray(); + try { + return copyLarge(reader, writer, buffer); + } finally { + ScratchBufferHolder.releaseScratchCharArray(buffer); + } } /** @@ -1770,7 +1849,12 @@ public static long copyLarge(final Reader reader, final Writer writer, final cha * @since 2.2 */ public static long copyLarge(final Reader reader, final Writer writer, final long inputOffset, final long length) throws IOException { - return copyLarge(reader, writer, inputOffset, length, getScratchCharArray()); + final char[] buffer = ScratchBufferHolder.getScratchCharArray(); + try { + return copyLarge(reader, writer, inputOffset, length, buffer); + } finally { + ScratchBufferHolder.releaseScratchCharArray(buffer); + } } /** @@ -1837,64 +1921,6 @@ static UnsynchronizedByteArrayOutputStream copyToOutputStream( } } - /** - * Fills the given array with 0s. - * - * @param arr The non-null array to fill. - * @return The given array. - */ - private static byte[] fill0(final byte[] arr) { - Arrays.fill(arr, (byte) 0); - return arr; - } - - /** - * Fills the given array with 0s. - * - * @param arr The non-null array to fill. - * @return The given array. - */ - private static char[] fill0(final char[] arr) { - Arrays.fill(arr, (char) 0); - return arr; - } - - /** - * Gets the internal byte array buffer, intended for both reading and writing. - * - * @return the internal byte array buffer, intended for both reading and writing. - */ - static byte[] getScratchByteArray() { - return fill0(SCRATCH_BYTE_BUFFER_RW.get()); - } - - /** - * Gets the internal byte array intended for write only operations. - * - * @return the internal byte array intended for write only operations. - */ - static byte[] getScratchByteArrayWriteOnly() { - return fill0(SCRATCH_BYTE_BUFFER_WO); - } - - /** - * Gets the char byte array buffer, intended for both reading and writing. - * - * @return the char byte array buffer, intended for both reading and writing. - */ - static char[] getScratchCharArray() { - return fill0(SCRATCH_CHAR_BUFFER_RW.get()); - } - - /** - * Gets the internal char array intended for write only operations. - * - * @return the internal char array intended for write only operations. - */ - static char[] getScratchCharArrayWriteOnly() { - return fill0(SCRATCH_CHAR_BUFFER_WO); - } - /** * Returns the length of the given array in a null-safe manner. * @@ -2527,7 +2553,12 @@ public static URL resourceToURL(final String name, final ClassLoader classLoader * @since 2.0 */ public static long skip(final InputStream input, final long skip) throws IOException { - return skip(input, skip, IOUtils::getScratchByteArrayWriteOnly); + final byte[] buffer = ScratchBufferHolder.getScratchByteArray(); + try { + return skip(input, skip, () -> buffer); + } finally { + ScratchBufferHolder.releaseScratchByteArray(buffer); + } } /** @@ -2634,14 +2665,18 @@ public static long skip(final Reader reader, final long toSkip) throws IOExcepti throw new IllegalArgumentException("Skip count must be non-negative, actual: " + toSkip); } long remain = toSkip; - while (remain > 0) { - // See https://issues.apache.org/jira/browse/IO-203 for why we use read() rather than delegating to skip() - final char[] charArray = getScratchCharArrayWriteOnly(); - final long n = reader.read(charArray, 0, (int) Math.min(remain, charArray.length)); - if (n < 0) { // EOF - break; + final char[] charArray = ScratchBufferHolder.getScratchCharArray(); + try { + while (remain > 0) { + // See https://issues.apache.org/jira/browse/IO-203 for why we use read() rather than delegating to skip() + final long n = reader.read(charArray, 0, (int) Math.min(remain, charArray.length)); + if (n < 0) { // EOF + break; + } + remain -= n; } - remain -= n; + } finally { + ScratchBufferHolder.releaseScratchCharArray(charArray); } return toSkip - remain; } @@ -2667,7 +2702,7 @@ public static long skip(final Reader reader, final long toSkip) throws IOExcepti * @since 2.0 */ public static void skipFully(final InputStream input, final long toSkip) throws IOException { - final long skipped = skip(input, toSkip, IOUtils::getScratchByteArrayWriteOnly); + final long skipped = skip(input, toSkip); if (skipped != toSkip) { throw new EOFException("Bytes to skip: " + toSkip + " actual: " + skipped); } diff --git a/src/test/java/org/apache/commons/io/IOCaseTest.java b/src/test/java/org/apache/commons/io/IOCaseTest.java index 97a990f6369..1121b38eeeb 100644 --- a/src/test/java/org/apache/commons/io/IOCaseTest.java +++ b/src/test/java/org/apache/commons/io/IOCaseTest.java @@ -18,6 +18,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotSame; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -296,34 +297,50 @@ void test_getName() { @Test void test_getScratchByteArray() { - final byte[] array = IOUtils.getScratchByteArray(); - assert0(array); - Arrays.fill(array, (byte) 1); - assert0(IOUtils.getScratchCharArray()); - } - - @Test - void test_getScratchByteArrayWriteOnly() { - final byte[] array = IOUtils.getScratchByteArrayWriteOnly(); - assert0(array); - Arrays.fill(array, (byte) 1); - assert0(IOUtils.getScratchCharArray()); + final byte[] array = IOUtils.ScratchBufferHolder.getScratchByteArray(); + try { + assert0(array); + Arrays.fill(array, (byte) 1); + // Get another array, while the first is still in use + final byte[] array2 = IOUtils.ScratchBufferHolder.getScratchByteArray(); + assert0(array2); + assertNotSame(array, array2); + } finally { + // Release first array + IOUtils.ScratchBufferHolder.releaseScratchByteArray(array); + } + // The first array should be reset and reusable + final byte[] array3 = IOUtils.ScratchBufferHolder.getScratchByteArray(); + try { + assert0(array3); + assertSame(array, array3); + } finally { + IOUtils.ScratchBufferHolder.releaseScratchByteArray(array3); + } } @Test void test_getScratchCharArray() { - final char[] array = IOUtils.getScratchCharArray(); - assert0(array); - Arrays.fill(array, (char) 1); - assert0(IOUtils.getScratchCharArray()); - } - - @Test - void test_getScratchCharArrayWriteOnly() { - final char[] array = IOUtils.getScratchCharArrayWriteOnly(); - assert0(array); - Arrays.fill(array, (char) 1); - assert0(IOUtils.getScratchCharArray()); + final char[] array = IOUtils.ScratchBufferHolder.getScratchCharArray(); + try { + assert0(array); + Arrays.fill(array, (char) 1); + // Get another array, while the first is still in use + final char[] array2 = IOUtils.ScratchBufferHolder.getScratchCharArray(); + assert0(array2); + assertNotSame(array, array2); + } finally { + // Release first array + IOUtils.ScratchBufferHolder.releaseScratchCharArray(array); + } + // The first array should be reset and reusable + final char[] array3 = IOUtils.ScratchBufferHolder.getScratchCharArray(); + try { + assert0(array3); + assertSame(array, array3); + } finally { + IOUtils.ScratchBufferHolder.releaseScratchCharArray(array3); + } } @Test diff --git a/src/test/java/org/apache/commons/io/IOUtilsConcurrentTest.java b/src/test/java/org/apache/commons/io/IOUtilsConcurrentTest.java new file mode 100644 index 00000000000..0e1fb057175 --- /dev/null +++ b/src/test/java/org/apache/commons/io/IOUtilsConcurrentTest.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.commons.io; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; + +import java.io.IOException; +import java.io.InputStream; +import java.io.Reader; +import java.io.StringReader; +import java.nio.charset.Charset; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import java.util.zip.CRC32; +import java.util.zip.Checksum; + +import org.apache.commons.io.function.IOConsumer; +import org.apache.commons.io.input.ChecksumInputStream; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +/** + * Tests {@link IOUtils} methods in a concurrent environment. + */ +class IOUtilsConcurrentTest { + + private static class ChecksumReader extends Reader { + private final CRC32 checksum; + private final long expectedChecksumValue; + private final Reader reader; + + ChecksumReader(Reader reader, long expectedChecksumValue) { + this.reader = reader; + this.checksum = new CRC32(); + this.expectedChecksumValue = expectedChecksumValue; + } + + @Override + public void close() throws IOException { + reader.close(); + } + + public long getValue() { + return checksum.getValue(); + } + + @Override + public int read() throws IOException { + return super.read(); + } + + @Override + public int read(char[] cbuf, int off, int len) throws IOException { + final int n = reader.read(cbuf, off, len); + if (n > 0) { + final byte[] bytes = new String(cbuf, off, n).getBytes(Charset.defaultCharset()); + checksum.update(bytes, 0, bytes.length); + } + if (n == -1) { + final long actual = checksum.getValue(); + if (actual != expectedChecksumValue) { + throw new IOException("Checksum mismatch: expected " + expectedChecksumValue + " but got " + actual); + } + } + return n; + } + } + + /** + * Test data for InputStream tests. + */ + private static final byte[][] BYTE_DATA; + /** + * Checksum values for {@link #BYTE_DATA}. + */ + private static final long[] BYTE_DATA_CHECKSUM; + /** + * Number of runs per thread (to increase the chance of collisions). + */ + private static final int RUNS_PER_THREAD = 16; + /** + * Size of test data. + */ + private static final int SIZE = IOUtils.DEFAULT_BUFFER_SIZE; + /** + * Test data for Reader tests. + */ + private static final String[] STRING_DATA; + /** + * Checksum values for {@link #STRING_DATA}. + */ + private static final long[] STRING_DATA_CHECKSUM; + /** + * Number of threads to use. + */ + private static final int THREAD_COUNT = 16; + /** + * Number of data variants (to increase the chance of collisions). + */ + private static final int VARIANTS = 16; + + static { + final Checksum checksum = new CRC32(); + // Byte data + BYTE_DATA = new byte[VARIANTS][]; + BYTE_DATA_CHECKSUM = new long[VARIANTS]; + for (int variant = 0; variant < VARIANTS; variant++) { + final byte[] data = new byte[SIZE]; + for (int i = 0; i < SIZE; i++) { + data[i] = (byte) ((i + variant) % 256); + } + BYTE_DATA[variant] = data; + checksum.reset(); + checksum.update(data, 0 , data.length); + BYTE_DATA_CHECKSUM[variant] = checksum.getValue(); + } + // Char data + final char[] cdata = new char[SIZE]; + STRING_DATA = new String[VARIANTS]; + STRING_DATA_CHECKSUM = new long[VARIANTS]; + for (int variant = 0; variant < VARIANTS; variant++) { + for (int i = 0; i < SIZE; i++) { + cdata[i] = (char) ((i + variant) % Character.MAX_VALUE); + } + STRING_DATA[variant] = new String(cdata); + checksum.reset(); + final byte[] bytes = STRING_DATA[variant].getBytes(Charset.defaultCharset()); + checksum.update(bytes, 0, bytes.length); + STRING_DATA_CHECKSUM[variant] = checksum.getValue(); + } + } + + static Stream> testConcurrentInputStreamTasks() { + return Stream.of( + IOUtils::consume, + in -> IOUtils.skip(in, Long.MAX_VALUE), + in -> IOUtils.skipFully(in, SIZE), + IOUtils::toByteArray, + in -> IOUtils.toByteArray(in, SIZE), + in -> IOUtils.toByteArray(in, SIZE, 512) + ); + } + + static Stream> testConcurrentReaderTasks() { + return Stream.of( + IOUtils::consume, + reader -> IOUtils.skip(reader, Long.MAX_VALUE), + reader -> IOUtils.skipFully(reader, SIZE), + reader -> IOUtils.toByteArray(reader, Charset.defaultCharset()) + ); + } + + @ParameterizedTest + @MethodSource + void testConcurrentInputStreamTasks(IOConsumer consumer) throws InterruptedException { + final ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT); + try { + final List> futures = IntStream.range(0, THREAD_COUNT * RUNS_PER_THREAD) + .>mapToObj(i -> threadPool.submit(() -> { + try (InputStream in = ChecksumInputStream + .builder() + .setByteArray(BYTE_DATA[i % VARIANTS]) + .setChecksum(new CRC32()) + .setExpectedChecksumValue(BYTE_DATA_CHECKSUM[i % VARIANTS]) + .get()) { + consumer.accept(in); + } + return null; + })).collect(Collectors.toList()); + futures.forEach(f -> assertDoesNotThrow(() -> f.get())); + } finally { + threadPool.shutdownNow(); + } + } + + @ParameterizedTest + @MethodSource + void testConcurrentReaderTasks(IOConsumer consumer) throws InterruptedException { + final ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT); + try { + final List> futures = IntStream.range(0, THREAD_COUNT * RUNS_PER_THREAD) + .>mapToObj(i -> threadPool.submit(() -> { + try (Reader reader = new ChecksumReader(new StringReader(STRING_DATA[i % VARIANTS]), STRING_DATA_CHECKSUM[i % VARIANTS])) { + consumer.accept(reader); + } + return null; + })).collect(Collectors.toList()); + futures.forEach(f -> assertDoesNotThrow(() -> f.get())); + } finally { + threadPool.shutdownNow(); + } + } +}