From 1c8c07836020db709b6a7995b4f33863b049f56d Mon Sep 17 00:00:00 2001 From: Sebastian Nagel Date: Tue, 3 Mar 2026 11:12:29 +0100 Subject: [PATCH] Simple table converter (for GAW URL list) Convert a two-column tab-separated table into a Parquet table, enriching URL-based rows (domain, tld, etc.) --- pom.xml | 7 + .../commoncrawl/spark/GAWTableConverter.java | 207 ++++++++++++++++++ .../resources/schema/gaw-index-schema.json | 167 ++++++++++++++ 3 files changed, 381 insertions(+) create mode 100644 src/main/java/org/commoncrawl/spark/GAWTableConverter.java create mode 100644 src/main/resources/schema/gaw-index-schema.json diff --git a/pom.xml b/pom.xml index b737e5a..1fc51d8 100644 --- a/pom.xml +++ b/pom.xml @@ -22,6 +22,7 @@ 2.0.17 1.6 + 0.32.0 2.13.2 33.5.0-jre 2.41.32 @@ -106,6 +107,12 @@ + + org.netpreserve + jwarc + ${jwarc.version} + + diff --git a/src/main/java/org/commoncrawl/spark/GAWTableConverter.java b/src/main/java/org/commoncrawl/spark/GAWTableConverter.java new file mode 100644 index 0000000..5b25e6c --- /dev/null +++ b/src/main/java/org/commoncrawl/spark/GAWTableConverter.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.commoncrawl.spark; + +import java.io.IOException; +import java.sql.Timestamp; +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.DataFrameReader; +import org.apache.spark.sql.DataFrameWriter; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SparkSession; +import org.commoncrawl.net.HostName; +import org.commoncrawl.net.WarcUri; +import org.netpreserve.jwarc.URIs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GAWTableConverter extends IndexTable { + + private static final Logger LOG = LoggerFactory.getLogger(GAWTableConverter.class); + protected String name = GAWTableConverter.class.getCanonicalName(); + + public static Row convertRow(Row inp) { + try { + String url = inp.getString(0); + String timeStampString = inp.getString(1); + ZonedDateTime fetchTime = fetchTimeParser.parse(timeStampString, ZonedDateTime::from); + Timestamp timestamp = Timestamp.from(fetchTime.toInstant()); + WarcUri uri = new WarcUri(url); + HostName host = uri.getHostName(); + Row h = host.asRow(); + List row = new ArrayList<>(); + // url_surtkey + row.add(URIs.toNormalizedSurt(url)); + // url + row.add(uri.getUrlString()); + // host + row.add(h.get(0)); + row.add(h.get(1)); + row.add(h.get(2)); + row.add(h.get(3)); + row.add(h.get(4)); + row.add(h.get(5)); + row.add(h.get(6)); + row.add(h.get(7)); + row.add(h.get(8)); + row.add(h.get(9)); + row.add(h.get(10)); + // URL components + row.add(uri.getProtocol()); + row.add(uri.getPort()); + row.add(uri.getPath()); + row.add(uri.getQuery()); + // capture time + row.add(timestamp); + return RowFactory.create(row.toArray()); + } catch (Exception e) { + LOG.error("Failed to convert row", e); + return null; + } + } + + public void runJob(String inputPaths, String outputPath, Function mapIndexEntries) throws IOException { + SparkConf conf = new SparkConf(); + conf.setAppName(this.name); + SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); + + LOG.info("Function to convert table rows: {}", mapIndexEntries); + + DataFrameReader reader = spark.read(); + JavaRDD input; + // TODO: support input formats other than CSV + reader = reader.option("delimiter", "\t"); + reader = reader.option("header", "false"); + input = reader.csv(inputPaths).toJavaRDD(); + JavaRDD output = input.map(mapIndexEntries); + + /* + * sort by "url_surtkey" to achieve better compression and better query + * performance + */ + output = output.sortBy(row -> row.getString(0), true, 1); + // TODO: make number of output partitions configurable + + if (verbose) { + LOG.info(schema.prettyJson()); + } + + Dataset df = spark.createDataFrame(output, schema); + + if (verbose) { + df.printSchema(); + // df.show(); // That's expensive in combination with sorting + } + + // Note: cannot use nested columns for partitioning (SPARK-18084) + String[] partitionColumns = {}; + if (!partitionBy.trim().isEmpty()) { + partitionColumns = partitionBy.trim().split("\\s*,\\s*"); + Column[] pCols = new Column[partitionColumns.length + 1]; + for (int i = 0; i < partitionColumns.length; i++) { + pCols[i] = df.col(partitionColumns[i]); + } + // enforce sorting by "url_surtkey" within each partition + // below the columns used for partitioning + pCols[pCols.length - 1] = df.col("url_surtkey"); + df = df.sortWithinPartitions(pCols); + } + + if (verbose) { + df.explain(true); + } + + DataFrameWriter dfw = df.write().format(outputFormat); + dfw.option("compression", outputCompression); + if (partitionColumns.length > 0) { + dfw.partitionBy(partitionColumns); + } + dfw.save(outputPath); + spark.close(); + } + + @Override + protected CommandLine applyCommandLineOptions(CommandLine cli) { + // apply output options defined in IndexTable + super.applyCommandLineOptions(cli); + + // specific input options + // TODO + + return cli; + } + + @Override + public void run(String[] args) throws IOException { + Options options = addCommandLineOptions(new Options()); + + CommandLineParser parser = new DefaultParser(); + CommandLine cli; + + try { + cli = parser.parse(options, args); + } catch (ParseException e) { + System.err.println(e.getMessage()); + help(options); + System.exit(-1); + return; + } + + if (cli.hasOption("help")) { + help(options); + return; + } + + cli = applyCommandLineOptions(cli); + + String[] arguments = cli.getArgs(); + if (arguments.length < 2) { + help(options); + System.exit(1); + } + + String inputPaths = arguments[0]; + String outputPath = arguments[1]; + + if ("orc".equals(outputFormat) && "gzip".equals(outputCompression)) { + // gzip for Parquet, zlib for ORC + outputCompression = "zlib"; + } + + runJob(inputPaths, outputPath, GAWTableConverter::convertRow); + } + + public static void main(String[] args) throws IOException { + IndexTable job = new GAWTableConverter(); + job.run(args); + } + +} diff --git a/src/main/resources/schema/gaw-index-schema.json b/src/main/resources/schema/gaw-index-schema.json new file mode 100644 index 0000000..8087eee --- /dev/null +++ b/src/main/resources/schema/gaw-index-schema.json @@ -0,0 +1,167 @@ +{ + "type": "struct", + "fields": [ + { + "name": "url_surtkey", + "type": "string", + "nullable": false, + "metadata": { + "description": "SURT URL key", + "example": "com,example)/path/index.html" + } + }, + { + "name": "url", + "type": "string", + "nullable": false, + "metadata": { + "description": "URL string", + "example": "https://www.example.com/path/index.html" + } + }, + { + "name": "url_host_name", + "type": "string", + "nullable": false, + "metadata": { + "description": "Hostname, including IP addresses", + "example": "www.example.com" + } + }, + { + "name": "url_host_tld", + "nullable": true, + "type": "string", + "metadata": { + "description": "Top-level domain or last part of the hostname", + "example": "com for the hostname www.example.com" + } + }, + { + "name": "url_host_2nd_last_part", + "nullable": true, + "type": "string", + "metadata": { + "description": "Second last part of the hostname", + "example": "example for the hostname www.example.com, co for bbc.co.uk" + } + }, + { + "name": "url_host_3rd_last_part", + "nullable": true, + "type": "string", + "metadata": { + "description": "Third last part of the hostname", + "example": "www for the hostname www.example.com" + } + }, + { + "name": "url_host_4th_last_part", + "nullable": true, + "type": "string", + "metadata": { + "description": "4th last part of the hostname", + "example": "host1 for host1.subdomain.example.com" + } + }, + { + "name": "url_host_5th_last_part", + "nullable": true, + "type": "string", + "metadata": { + "description": "5th last part of the hostname", + "example": "host1 for host1.sub2.subdomain.example.com" + } + }, + { + "name": "url_host_registry_suffix", + "type": "string", + "nullable": true, + "metadata": { + "description": "Domain registry suffix", + "example": "com, co.uk" + } + }, + { + "name": "url_host_registered_domain", + "type": "string", + "nullable": true, + "metadata": { + "description": "Domain name of the host (one level below the registry suffix)", + "example": "example.com, bbc.co.uk" + } + }, + { + "name": "url_host_private_suffix", + "type": "string", + "nullable": true, + "metadata": { + "description": "Suffix of domain registries including private registrars, see https://publicsuffix.org/", + "example": "com, co.uk, but also s3.amazonaws.com or blogspot.com" + } + }, + { + "name": "url_host_private_domain", + "type": "string", + "nullable": true, + "metadata": { + "description": "Domain name of the host (one level below the private suffix)", + "example": "mypublicbucket.s3.amazonaws.com or myblog.blogspot.com" + } + }, + { + "name": "url_host_name_reversed", + "type": "string", + "nullable": true, + "metadata": { + "description": "Hostname, excluding IP addresses, in reverse domain name notation", + "example": "com.example.www" + } + }, + { + "name": "url_protocol", + "type": "string", + "nullable": false, + "metadata": { + "description": "Protocol of the URL", + "example": "https" + } + }, + { + "name": "url_port", + "type": "integer", + "nullable": true, + "metadata": { + "description": "Port of the URL (null if not explicitly specified in the URL)", + "example": "8443" + } + }, + { + "name": "url_path", + "type": "string", + "nullable": true, + "metadata": { + "description": "File path of the URL", + "example": "/path/index.html" + } + }, + { + "name": "url_query", + "type": "string", + "nullable": true, + "metadata": { + "description": "Query part of the URL", + "example": "q=abc&lang=en for .../search?q=abc&lang=en" + } + }, + { + "name": "fetch_time", + "type": "timestamp", + "nullable": false, + "metadata": { + "description": "Fetch time (capture time stamp)", + "example": "2017-10-24T00:14:32Z" + } + } + ] +}