diff --git a/src/java/org/commoncrawl/util/UUIDv7.java b/src/java/org/commoncrawl/util/UUIDv7.java new file mode 100644 index 0000000000..02896728f6 --- /dev/null +++ b/src/java/org/commoncrawl/util/UUIDv7.java @@ -0,0 +1,288 @@ +package org.commoncrawl.util; + +import java.security.SecureRandom; +import java.time.Clock; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Thread-safe implementation of UUIDv7 (RFC 9562). + *
+ * Layout (128 bits total): + *
+ * MSB (Most Significant Bits - 64 bits): + * [48 bits] Unix epoch timestamp (milliseconds) + * [ 4 bits] Version = 7 (0111) + * [12 bits] Sub-millisecond sequence counter (for monotonicity) + * + * LSB (Least Significant Bits - 64 bits): + * [ 2 bits] Variant = 2 (10 binary - IETF RFC variant) + * [62 bits] Random bits (collision resistance) + *+ *
+ * Monotonicity Guarantee: + *
Layout of the 64-bit long: + *
+ * [52 bits] timestamp (only lower 48 bits used, upper 4 bits ignored) + * [12 bits] sequence counter + *+ */ + private final AtomicLong state = new AtomicLong(0L); + + /** + * Source of cryptographic randomness for the lower 62 bits of the UUID. + * + * @implNote SecureRandom is used to prevent predictability, which is + * important if UUIDs might be exposed in URLs or logs. + * For pure collision resistance without security requirements, a faster + * RandomGenerator could be substituted. + */ + private final SecureRandom random = new SecureRandom(); + + /** + * Package-private constructor for dependency injection during testing. + */ + UUIDv7(Clock clock) { + this.clock = clock; + } + + /** + * Generates a new UUIDv7 with monotonic ordering guarantees. + *
+ * Lock-free algorithm using Compare-And-Swap (CAS) to ensure thread-safety + * and monotonicity even under high concurrency. + * + * @implSpec + * Algorithm Steps: + *
+ * Constructs the 128-bit UUID according to RFC 9562 layout: + * + *
+ * MSB (64 bits): + * Bits 0-47: Unix timestamp (milliseconds) + * Bits 48-51: Version = 7 (0111 binary) + * Bits 52-63: Sequence counter (12 bits) + * + * LSB (64 bits): + * Bits 0- 1: Variant = 2 (10 binary, meaning bits are 10xxxxxx...) + * Bits 2-63: Random bits (62 bits) + *+ * + * @param timestamp the Unix epoch millisecond timestamp (only lower 48 bits used) + * @param sequence the sub-millisecond sequence counter (0-4095) + * @return a properly formatted UUIDv7 + */ + private UUID buildUUID(long timestamp, long sequence) { + + // STEP 1: HIGH BITS CONSTRUCTION (Most Significant Bits) + + // Start with timestamp in the leftmost 48 bits + // Mask ensures we only use 48 bits: 0xFFFFFFFFFFFF = 48 set bits + // Left shift by 16 to make room for version (4 bits) + sequence (12 bits) + long msb = (timestamp & 0xFFFFFFFFFFFFL) << 16; + + // OR in the version field: 7 in binary is 0111 + // 0x7000 = 0111 0000 0000 0000 in binary (version 7 in correct position) + msb |= 0x7000L; + + // OR in the sequence counter in the lowest 12 bits + // Mask ensures sequence fits in 12 bits: 0xFFF = 0000 1111 1111 1111 + msb |= (sequence & 0xFFFL); + + // STEP 2: LOW BITS CONSTRUCTION (Least Significant Bits) + + long randomBits = this.random.nextLong(); + + // Set variant bits: must be 10 (binary) per RFC 9562 + // 0x3FFFFFFFFFFFFFFF clears top 2 bits: 00111111... + // 0x8000000000000000 sets top bit to 1: 10000000... + // Result: 10xxxxxx... where x = random bits + long lsb = (randomBits & 0x3FFFFFFFFFFFFFFFL) | 0x8000000000000000L; + + // STEP 3: Construct the UUID from the two 64-bit longs + return new UUID(msb, lsb); + } + + /** + * Static factory to generate a new UUIDv7. + *
+ * This method provides a drop-in replacement for {@link UUID#randomUUID()} + * with the added benefits of time-ordering and monotonicity. + *
+ * All UUIDs generated through this method share the same monotonic sequence, + * ensuring process-wide ordering guarantees. + * + * @return a new UUIDv7 + */ + public static UUID randomUUID() { + return SHARED.generate(); + } + + /** + * Creates a UUIDv7 from an explicit Unix epoch millisecond timestamp. + *
+ * This is a stateless factory method: it does not participate in the + * monotonic sequence maintained by {@link #randomUUID()}. The caller + * is responsible for ensuring timestamp ordering. + *
+ * The sequence counter is set to 0 and the lower 62 bits are filled + * with cryptographically secure random data. + * + * @param timestamp Unix epoch milliseconds (must fit in 48 bits) + * @return a UUIDv7 embedding the given timestamp + * @throws IllegalArgumentException if timestamp is negative or >= 2^48 + */ + public static UUID fromTimestamp(long timestamp) { + if ((timestamp >> 48) != 0) { + throw new IllegalArgumentException( + "Timestamp does not fit in 48 bits: " + timestamp); + } + // RFC 9562 Section 6.2 Method 1: fill the 12-bit sub-millisecond + // field with random data. + int random12 = SHARED.random.nextInt(0x1000); + return SHARED.buildUUID(timestamp, random12); + } + + /** + * Creates a UUIDv7 from an explicit Unix epoch millisecond timestamp + * and sequence counter. + *
+ * This is a stateless factory method: it does not participate in the + * monotonic sequence maintained by {@link #randomUUID()}. The caller + * is responsible for ensuring timestamp and sequence ordering. + * + * @param timestamp Unix epoch milliseconds (must fit in 48 bits) + * @param sequence sub-millisecond sequence counter (0-4095) + * @return a UUIDv7 embedding the given timestamp and sequence + * @throws IllegalArgumentException if timestamp is negative or >= 2^48, + * or if sequence is outside the range 0-4095 + */ + public static UUID fromTimestamp(long timestamp, int sequence) { + if ((timestamp >> 48) != 0) { + throw new IllegalArgumentException( + "Timestamp does not fit in 48 bits: " + timestamp); + } + if (sequence < 0 || sequence > 0xFFF) { + throw new IllegalArgumentException( + "Sequence must be in range 0-4095: " + sequence); + } + return SHARED.buildUUID(timestamp, sequence); + } +} diff --git a/src/java/org/commoncrawl/util/WarcWriter.java b/src/java/org/commoncrawl/util/WarcWriter.java index aa9b20ba3b..b42287dec1 100644 --- a/src/java/org/commoncrawl/util/WarcWriter.java +++ b/src/java/org/commoncrawl/util/WarcWriter.java @@ -197,7 +197,7 @@ public URI writeWarcinfoRecord(String filename, String hostname, writeWarcKeyValue(sb, settings); byte[] ba = sb.toString().getBytes(StandardCharsets.UTF_8); - URI recordId = getRecordId(); + URI recordId = getRecordId(date.getTime()); writeRecord(WARC_INFO, date, CONTENT_TYPE_METADATA, recordId, extra, new ByteArrayInputStream(ba), ba.length); @@ -222,7 +222,7 @@ public URI writeWarcRequestRecord(final URI targetUri, final String ip, } } - URI recordId = getRecordId(); + URI recordId = getRecordId(date.getTime()); writeRecord(WARC_REQUEST, date, "application/http; msgtype=request", recordId, extra, block); return recordId; @@ -265,7 +265,7 @@ public URI writeWarcResponseRecord(final URI targetUri, final String ip, extra.put(WARC_IDENTIFIED_PAYLOAD_TYPE, content.getContentType()); - URI recordId = getRecordId(); + URI recordId = getRecordId(date.getTime()); writeRecord(WARC_RESPONSE, date, CONTENT_TYPE_RESPONSE, recordId, extra, block); return recordId; } @@ -305,7 +305,7 @@ public URI writeWarcRevisitRecord(final URI targetUri, final String ip, extra.put(WARC_BLOCK_DIGEST, blockDigest); } - URI recordId = getRecordId(); + URI recordId = getRecordId(date.getTime()); writeRecord(WARC_REVISIT, date, CONTENT_TYPE_RESPONSE, recordId, extra, block); return recordId; } @@ -322,7 +322,7 @@ public URI writeWarcMetadataRecord(final URI targetUri, final Date date, extra.put(WARC_BLOCK_DIGEST, blockDigest); } - URI recordId = getRecordId(); + URI recordId = getRecordId(date.getTime()); writeRecord(WARC_METADATA, date, CONTENT_TYPE_METADATA, recordId, extra, block); return recordId; } @@ -339,7 +339,7 @@ public URI writeWarcConversionRecord(final URI targetUri, final Date date, extra.put(WARC_BLOCK_DIGEST, blockDigest); } - URI recordId = getRecordId(); + URI recordId = getRecordId(date.getTime()); writeRecord(WARC_CONVERSION, date, contentType, recordId, extra, block); return recordId; } @@ -456,10 +456,28 @@ protected static void writeWarcKeyValue(StringBuilder sb, String key, sb.append(key).append(COLONSP).append(value).append(CRLF); } + /** + * This method is deprecated with the introduction of the UUID of type 7 that introduce a timestamp + * component. We use the capture timestamp for composing the UUID. + * + * @see String getUUID(long timestamp) + */ + @Deprecated private String getUUID() { - return UUID.randomUUID().toString(); + return UUIDv7.randomUUID().toString(); } + private String getUUID(long timestamp) { + return UUIDv7.fromTimestamp(timestamp).toString(); + } + + /** + * This method is deprecated with the introduction of the UUID of type 7 that introduce a timestamp + * component. We use the capture timestamp for composing the UUID. + * + * @see String getRecordId(long timestamp) + */ + @Deprecated public URI getRecordId() { try { return new URI("urn:uuid:" + getUUID()); @@ -468,6 +486,14 @@ public URI getRecordId() { } } + public URI getRecordId(long timestamp) { + try { + return new URI("urn:uuid:" + getUUID(timestamp)); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + protected static String getMeta(Metadata metadata, String name) { String value = metadata.get(name); if (value == null) { diff --git a/src/test/org/commoncrawl/util/TestUUIDv7.java b/src/test/org/commoncrawl/util/TestUUIDv7.java new file mode 100644 index 0000000000..493fb7d18a --- /dev/null +++ b/src/test/org/commoncrawl/util/TestUUIDv7.java @@ -0,0 +1,510 @@ +package org.commoncrawl.util; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.Test; + +class TestUUIDv7 { + + /** + * Validates that generated UUIDs have correct version (7) and variant (2) fields + * as required by RFC 9562 for proper UUID classification. + */ + @Test + void versionAndVariantAreCorrect() { + UUID result = UUIDv7.randomUUID(); + + assertEquals(7, result.version(), "must be UUIDv7"); + assertEquals(2, result.variant(), "must be IETF variant"); + } + + /** + * Verifies strict monotonic ordering in a single-threaded context. + *
+ * Tests both code paths: + *
Spawns multiple threads that simultaneously generate UUIDs and verifies: + *
+ * This test verifies: + *
+ * Extracts and validates individual fields: + *
+ * Java's {@link UUID#compareTo(UUID)} uses signed long comparison,
+ * which doesn't match the lexicographic byte ordering required by UUIDv7.
+ *
+ * @param a first UUID
+ * @param b second UUID
+ * @return negative if a < b, zero if a == b, positive if a > b (lexicographically)
+ */
+ private static int compareUnsignedLex(UUID a, UUID b) {
+ int msb = Long.compareUnsigned(a.getMostSignificantBits(), b.getMostSignificantBits());
+ if (msb != 0) {
+ return msb;
+ }
+ return Long.compareUnsigned(a.getLeastSignificantBits(), b.getLeastSignificantBits());
+ }
+
+ /**
+ * Mutable Clock implementation for testing.
+ */
+ private static final class MutableClock extends Clock {
+
+ private volatile long millis;
+
+ MutableClock(long initialMillis) {
+ this.millis = initialMillis;
+ }
+
+ void setMillis(long ms) {
+ this.millis = ms;
+ }
+
+ void addMillis(long delta) {
+ this.millis += delta;
+ }
+
+ @Override
+ public ZoneOffset getZone() {
+ return ZoneOffset.UTC;
+ }
+
+ @Override
+ public Clock withZone(java.time.ZoneId zone) {
+ return this;
+ }
+
+ @Override
+ public long millis() {
+ return this.millis;
+ }
+
+ @Override
+ public Instant instant() {
+ return Instant.ofEpochMilli(this.millis);
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/test/org/commoncrawl/util/TestWarcWriter.java b/src/test/org/commoncrawl/util/TestWarcWriter.java
index 4f7344010d..2ec6478dd9 100644
--- a/src/test/org/commoncrawl/util/TestWarcWriter.java
+++ b/src/test/org/commoncrawl/util/TestWarcWriter.java
@@ -26,58 +26,131 @@
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.net.URI;
+import java.util.ArrayList;
import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.util.zip.GZIPInputStream;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestWarcWriter {
- @Test
- public void testWriteRevisitRecordContentType() throws Exception {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- WarcWriter writer = new WarcWriter(bos);
-
- File segmentDir = new File(System.getProperty("test.build.data", "."), "test-segments/20260224170658-revisit");
- assertNotNull(segmentDir, "Missing segment resource");
- String segmentPath = segmentDir.getAbsolutePath();
- String url = "https://de.wikipedia.org/wiki/Wikipedia:WikiCon_2025";
-
- Content content = SegmenterRecordReader.retrieveContent(segmentPath, url);
- URI targetUri = new URI(content.getUrl());
-
- Metadata metadata = content.getMetadata();
- String ip = content.getMetadata().get("_ip_");
- int httpStatusCode = 304;
-
- Date date = HttpDateFormat.toDate(metadata.get("date"));
- URI warcinfoId = writer.getRecordId();
- URI relatedId = writer.getRecordId();
- String warcProfile = WarcWriter.PROFILE_REVISIT_IDENTICAL_DIGEST;
- Date refersToDate = new Date(System.currentTimeMillis() - 3600000);
- String payloadDigest = "sha1:abc123";
- String blockDigest = "sha1:def456";
-
- writer.writeWarcRevisitRecord(targetUri, ip, httpStatusCode, date,
- warcinfoId, relatedId, warcProfile, refersToDate, payloadDigest,
- blockDigest, null, null, content.getContent(), content);
-
- byte[] compressed = bos.toByteArray();
- ByteArrayInputStream bis = new ByteArrayInputStream(compressed);
- GZIPInputStream gis = new GZIPInputStream(bis);
- ByteArrayOutputStream decompressed = new ByteArrayOutputStream();
- gis.transferTo(decompressed);
-
- String warcOutput = decompressed.toString();
-
- assertTrue(warcOutput.contains("WARC-Type: revisit"),
- "WARC record should have WARC-Type: revisit");
- assertTrue(warcOutput.contains("Content-Type: application/http; msgtype=response"),
- "WARC revisit record should have Content-Type: application/http; msgtype=response");
- assertTrue(warcOutput.contains("WARC-Refers-To-Target-URI: https://de.wikipedia.org/wiki/Wikipedia:WikiCon_2025"),
- "WARC record should have WARC-Refers-To-Target-URI header");
- assertTrue(warcOutput.contains("WARC-Profile: " + warcProfile),
- "WARC record should have WARC-Profile header");
- }
+ @Test
+ public void testWriteRevisitRecordContentType() throws Exception {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ WarcWriter writer = new WarcWriter(bos);
+
+ File segmentDir = new File(System.getProperty("test.build.data", "src/testresources"), "test-segments/20260224170658-revisit");
+ assertNotNull(segmentDir, "Missing segment resource");
+ String segmentPath = segmentDir.getAbsolutePath();
+ String url = "https://de.wikipedia.org/wiki/Wikipedia:WikiCon_2025";
+
+ Content content = SegmenterRecordReader.retrieveContent(segmentPath, url);
+ assertThat("Revisit record should not have any payload or content",
+ content.getContent(), is(new byte[]{}));
+ URI targetUri = new URI(content.getUrl());
+
+ Metadata metadata = content.getMetadata();
+ String ip = content.getMetadata().get("_ip_");
+ int httpStatusCode = 304;
+
+ Date date = HttpDateFormat.toDate(metadata.get("date"));
+ URI warcinfoId = writer.getRecordId(date.getTime());
+ URI relatedId = writer.getRecordId(date.getTime());
+ String warcProfile = WarcWriter.PROFILE_REVISIT_IDENTICAL_DIGEST;
+ Date refersToDate = new Date(System.currentTimeMillis() - 3600000);
+ String payloadDigest = "sha1:abc123";
+ String blockDigest = "sha1:def456";
+
+ writer.writeWarcRevisitRecord(targetUri, ip, httpStatusCode, date,
+ warcinfoId, relatedId, warcProfile, refersToDate, payloadDigest,
+ blockDigest, null, null, content.getContent(), content);
+
+ byte[] compressed = bos.toByteArray();
+ ByteArrayInputStream bis = new ByteArrayInputStream(compressed);
+ GZIPInputStream gis = new GZIPInputStream(bis);
+ ByteArrayOutputStream decompressed = new ByteArrayOutputStream();
+ gis.transferTo(decompressed);
+
+ String warcOutput = decompressed.toString();
+
+ assertTrue(warcOutput.contains("WARC-Type: revisit"),
+ "WARC record should have WARC-Type: revisit");
+ assertTrue(warcOutput.contains("Content-Type: application/http; msgtype=response"),
+ "WARC revisit record should have Content-Type: application/http; msgtype=response");
+ assertTrue(warcOutput.contains("WARC-Refers-To-Target-URI: https://de.wikipedia.org/wiki/Wikipedia:WikiCon_2025"),
+ "WARC record should have WARC-Refers-To-Target-URI header");
+ assertTrue(warcOutput.contains("WARC-Profile: " + warcProfile),
+ "WARC record should have WARC-Profile header");
+ }
+
+ @Test
+ public void testWriteRecordWithUUID7() throws Exception {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ WarcWriter writer = new WarcWriter(bos);
+
+ File segmentDir = new File(System.getProperty("test.build.data", "src/testresources"), "test-segments/20150309101656");
+ assertNotNull(segmentDir, "Missing segment resource");
+ String segmentPath = segmentDir.getAbsolutePath();
+
+ String url = "http://avro.apache.org/";
+ Content content = SegmenterRecordReader.retrieveContent(segmentPath, url);
+
+ URI targetUri = new URI(content.getUrl());
+ Metadata metadata = content.getMetadata();
+ String ip = content.getMetadata().get("_ip_");
+ String contentType = content.getMetadata().get("Content-Type");
+ int httpStatusCode = 200;
+
+ Date date = HttpDateFormat.toDate(metadata.get("Date"));
+ URI warcinfoId = writer.getRecordId(date.getTime());
+ String payloadDigest = "sha1:abc123";
+ String blockDigest = "sha1:def456";
+
+ URI recordId = writer.getRecordId(date.getTime());
+
+ writer.writeWarcinfoRecord("output", "avro.apache.org", "CCF", "CCF", "CCBot", warcinfoId.toString(), "blablabal", date);
+ writer.writeWarcRequestRecord(targetUri, ip, date, warcinfoId, null, null, content.getContent());
+ writer.writeWarcResponseRecord(targetUri, ip, httpStatusCode, date, warcinfoId, recordId, payloadDigest, blockDigest, "False", null, null, content.getContent(), content);
+
+ byte[] compressed = bos.toByteArray();
+ ByteArrayInputStream bis = new ByteArrayInputStream(compressed);
+ StringBuilder allRecords = new StringBuilder();
+ while (bis.available() > 0) {
+ GZIPInputStream gis = new GZIPInputStream(bis);
+ ByteArrayOutputStream decompressed = new ByteArrayOutputStream();
+ gis.transferTo(decompressed);
+ allRecords.append(decompressed.toString());
+ }
+ String warcOutput = allRecords.toString();
+
+ Pattern pattern = Pattern.compile("WARC-Record-ID: