diff --git a/src/main/java/org/commoncrawl/spark/CCIndex2Table.java b/src/main/java/org/commoncrawl/spark/CCIndex2Table.java index c78614a..9553f45 100644 --- a/src/main/java/org/commoncrawl/spark/CCIndex2Table.java +++ b/src/main/java/org/commoncrawl/spark/CCIndex2Table.java @@ -17,6 +17,8 @@ package org.commoncrawl.spark; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.UUID; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; @@ -24,11 +26,13 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.commoncrawl.spark.util.CCWarcFilenameParser; -import org.commoncrawl.spark.util.CCWarcFilenameParser.FilenameParts; import org.commoncrawl.spark.util.CCWarcFilenameParser.FilenameParseError; +import org.commoncrawl.spark.util.CCWarcFilenameParser.FilenameParts; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.net.InetAddresses; + /** * Convert Common Crawl's URL index into a tabular format. */ @@ -43,6 +47,7 @@ protected static class CdxLine extends IndexTable.CdxLine { String redirect; String digest; String mime, mimeDetected; + byte[] recordid, ipaddress; String filename; int offset, length; short status; @@ -60,6 +65,19 @@ public CdxLine(String line) throws IOException { mime = getString("mime"); mimeDetected = getString("mime-detected"); + recordid = null; + String id = getString("recordid"); + if (id != null) { + UUID uuid = UUID.fromString(id); + recordid = new byte[16]; + ByteBuffer.wrap(recordid) + .putLong(uuid.getMostSignificantBits()) + .putLong(uuid.getLeastSignificantBits()); + } + String ip = getString("ipaddress"); + if (ip != null) { + ipaddress = InetAddresses.forString(ip).getAddress(); + } filename = getString("filename"); offset = getInt("offset"); length = getInt("length"); @@ -102,7 +120,7 @@ public static Row convertCdxLine(String line) { RowFactory.create(cdx.timestamp, cdx.status, cdx.redirect), // RowFactory .create(cdx.digest, cdx.mime, cdx.mimeDetected, cdx.charset, cdx.languages, cdx.truncated), // - RowFactory.create(cdx.filename, cdx.offset, cdx.length, cdx.segment), // + RowFactory.create(cdx.recordid, cdx.ipaddress, cdx.filename, cdx.offset, cdx.length, cdx.segment), // cdx.crawl, cdx.subset); } else { @@ -142,6 +160,9 @@ public static Row convertCdxLine(String line) { cdx.languages, // content (WARC record payload) truncated (since CC-MAIN-2019-47) cdx.truncated, + // WARC record headers + cdx.recordid, + cdx.ipaddress, // WARC record location cdx.filename, cdx.offset, diff --git a/src/main/java/org/commoncrawl/spark/util/NullOutputCommitter.java b/src/main/java/org/commoncrawl/spark/util/NullOutputCommitter.java index bc6b659..09f7600 100644 --- a/src/main/java/org/commoncrawl/spark/util/NullOutputCommitter.java +++ b/src/main/java/org/commoncrawl/spark/util/NullOutputCommitter.java @@ -16,12 +16,12 @@ */ package org.commoncrawl.spark.util; +import java.io.IOException; + import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import java.io.IOException; - public class NullOutputCommitter extends OutputCommitter { @Override diff --git a/src/main/resources/schema/cc-index-schema-flat.json b/src/main/resources/schema/cc-index-schema-flat.json index f89d663..59081e2 100644 --- a/src/main/resources/schema/cc-index-schema-flat.json +++ b/src/main/resources/schema/cc-index-schema-flat.json @@ -253,6 +253,28 @@ "fromCDX": "warc-truncated" } }, + { + "name": "warc_record_id", + "type": "binary", + "nullable": true, + "metadata": { + "description": "UUID of the WARC record (WARC-Record-ID)", + "example": "019d6d22-5cf4-7ad9-8a24-81690cf43c7d", + "since": "CC-MAIN-2026-21", + "fromCDX": "recordid" + } + }, + { + "name": "warc_ip_address", + "type": "binary", + "nullable": true, + "metadata": { + "description": "Numeric IP address contacted to retrieve the content (WARC-IP-Address)", + "example": "198.202.211.1 or 2620:cb:2000::1", + "since": "CC-MAIN-2026-21", + "fromCDX": "ipaddress" + } + }, { "name": "warc_filename", "type": "string", diff --git a/src/main/resources/schema/cc-index-schema-nested.json b/src/main/resources/schema/cc-index-schema-nested.json index c0c5732..8393567 100644 --- a/src/main/resources/schema/cc-index-schema-nested.json +++ b/src/main/resources/schema/cc-index-schema-nested.json @@ -298,6 +298,28 @@ "type": { "type": "struct", "fields": [ + { + "name": "record_id", + "type": "binary", + "nullable": true, + "metadata": { + "description": "UUID of the WARC record (WARC-Record-ID)", + "example": "019d6d22-5cf4-7ad9-8a24-81690cf43c7d", + "since": "CC-MAIN-2026-21", + "fromCDX": "recordid" + } + }, + { + "name": "ip_address", + "type": "binary", + "nullable": true, + "metadata": { + "description": "Numeric IP address contacted to retrieve the content (WARC-IP-Address)", + "example": "198.202.211.1 or 2620:cb:2000::1", + "since": "CC-MAIN-2026-21", + "fromCDX": "ipaddress" + } + }, { "name": "filename", "type": "string", diff --git a/src/script/convert_url_index.sh b/src/script/convert_url_index.sh index 60bfa63..5852894 100755 --- a/src/script/convert_url_index.sh +++ b/src/script/convert_url_index.sh @@ -55,6 +55,7 @@ $SPARK_HOME/bin/spark-submit \ --executor-cores $EXECUTOR_CORES \ --executor-memory $EXECUTOR_MEM \ --conf spark.hadoop.parquet.enable.dictionary=true \ + --conf 'spark.hadoop.parquet.enable.dictionary#warc_record_id=false' \ --conf spark.sql.parquet.filterPushdown=true \ --conf spark.sql.parquet.mergeSchema=false \ --conf spark.sql.hive.metastorePartitionPruning=true \