From 4204cbe63161b32b1bd80b6e58d0952fdceec217 Mon Sep 17 00:00:00 2001 From: Sebastian Nagel Date: Fri, 13 Dec 2024 13:22:40 +0100 Subject: [PATCH 1/5] Upgrade dependencies - Hadoop client: 3.3.4 -> 3.3.6 - webarchive-commons: 1.1.9 -> 1.2.0 Bump version number --- pom.xml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index 17a0170..5e0ddb7 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ org.commoncrawl cc-warc-examples - 0.4-SNAPSHOT + 0.5-SNAPSHOT jar cc-warc-examples @@ -55,7 +55,7 @@ org.netpreserve.commons webarchive-commons - 1.1.9 + 1.2.0 org.apache.hadoop @@ -67,7 +67,7 @@ org.apache.hadoop hadoop-client - 3.3.4 + 3.3.6 provided From 30dd7a7cbf19955ce98055ddd1e6c7336c596bcf Mon Sep 17 00:00:00 2001 From: Sebastian Nagel Date: Fri, 13 Dec 2024 14:10:28 +0100 Subject: [PATCH 2/5] Modernize examples - usage with recent Hadoop versions in local/non-distributed mode - pass input files (or directories) and output directory via command line - add minimal command-line help --- README.md | 8 +- .../examples/mapreduce/WARCTagCounter.java | 50 +++++++---- .../examples/mapreduce/WATServerType.java | 49 +++++++---- .../examples/mapreduce/WETWordCount.java | 86 ++++++++++++------- 4 files changed, 124 insertions(+), 69 deletions(-) diff --git a/README.md b/README.md index 500b380..fe0977a 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ There are three examples for Hadoop processing: + [WAT files] Server response analysis using response metadata + [WET files] Classic word count example using extracted text -All three assume initially that the files are stored locally in the data subdirectory but can be trivially modified to pull them down from Common Crawl's Amazon S3 bucket. To acquire the files, you can use any HTTP client or (if you are on AWS) the [AWS CLI](https://aws.amazon.com/cli/). +For development, you likely want to start with input files stored locally in the `data/` subdirectory. To acquire the files, you can use any HTTP client or (if you are on AWS) the [AWS CLI](https://aws.amazon.com/cli/). mkdir data cd data/ @@ -23,12 +23,12 @@ or on AWS aws s3 cp s3://commoncrawl/crawl-data/CC-MAIN-2013-48/segments/1386163035819/warc/CC-MAIN-20131204131715-00000-ip-10-33-133-15.ec2.internal.warc.gz data/ aws s3 cp s3://commoncrawl/crawl-data/CC-MAIN-2013-48/segments/1386163035819/wet/CC-MAIN-20131204131715-00000-ip-10-33-133-15.ec2.internal.warc.wet.gz data/ -To build and run: +To build and run in [Hadoop local or non-distributed mode](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SingleCluster.html#Standalone_Operation): mvn package - /bin/hadoop jar target/cc-warc-examples-0.3-SNAPSHOT-jar-with-dependencies.jar org.commoncrawl.examples.mapreduce.WETWordCount + /bin/hadoop jar target/cc-warc-examples-0.5-SNAPSHOT-jar-with-dependencies.jar org.commoncrawl.examples.mapreduce.WETWordCount -Dmapreduce.framework.name=local file:/tmp/cc/wet-word-count file:$PWD/data/*.wet.gz -All three examples place output in the directory `/tmp/cc`. +Note: all three examples require that you specify the output directory and all input files or directories. # License diff --git a/src/org/commoncrawl/examples/mapreduce/WARCTagCounter.java b/src/org/commoncrawl/examples/mapreduce/WARCTagCounter.java index 698bb66..6ca92fb 100644 --- a/src/org/commoncrawl/examples/mapreduce/WARCTagCounter.java +++ b/src/org/commoncrawl/examples/mapreduce/WARCTagCounter.java @@ -1,8 +1,11 @@ package org.commoncrawl.examples.mapreduce; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; @@ -34,28 +37,43 @@ public static void main(String[] args) throws Exception { /** * Builds and runs the Hadoop job. - * @return 0 if the Hadoop job completes successfully and 1 otherwise. + * + * @param args command line arguments + * @return 0 if the Hadoop job completes successfully and 1 otherwise. */ @Override - public int run(String[] arg0) throws Exception { + public int run(String[] args) throws Exception { + if (args.length < 2) { + System.err.println("Usage: " + this.getClass().getSimpleName() + " ..."); + return -1; + } + Path outputPath = null; + List inputPaths = new ArrayList<>(); + for (int i = 0; i < args.length; i++) { + if (outputPath == null) { + outputPath = new Path(args[i]); + } else { + inputPaths.add(new Path(args[i])); + } + } + return run(outputPath, inputPaths.toArray(new Path[inputPaths.size()])); + } + + public int run(Path outputPath, Path[] inputPaths) + throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = getConf(); - // - Job job = new Job(conf); + + Job job = Job.getInstance(conf); job.setJarByClass(WARCTagCounter.class); job.setNumReduceTasks(1); - String inputPath = "data/*.warc.gz"; - //inputPath = "s3://commoncrawl/crawl-data/CC-MAIN-2013-48/segments/1386163035819/wet/CC-MAIN-20131204131715-00000-ip-10-33-133-15.ec2.internal.warc.wet.gz"; - //inputPath = "s3://commoncrawl/crawl-data/CC-MAIN-2013-48/segments/1386163035819/wet/*.warc.wet.gz"; - LOG.info("Input path: " + inputPath); - FileInputFormat.addInputPath(job, new Path(inputPath)); - - String outputPath = "/tmp/cc/"; - FileSystem fs = FileSystem.newInstance(conf); - if (fs.exists(new Path(outputPath))) { - fs.delete(new Path(outputPath), true); + for (int i = 0; i < inputPaths.length; i++) { + LOG.info("Input path: " + inputPaths[i]); + FileInputFormat.addInputPath(job, inputPaths[i]); } - FileOutputFormat.setOutputPath(job, new Path(outputPath)); + + LOG.info("Output path: " + outputPath); + FileOutputFormat.setOutputPath(job, outputPath); job.setInputFormatClass(WARCFileInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); diff --git a/src/org/commoncrawl/examples/mapreduce/WATServerType.java b/src/org/commoncrawl/examples/mapreduce/WATServerType.java index c48d53d..106db97 100644 --- a/src/org/commoncrawl/examples/mapreduce/WATServerType.java +++ b/src/org/commoncrawl/examples/mapreduce/WATServerType.java @@ -1,8 +1,11 @@ package org.commoncrawl.examples.mapreduce; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; @@ -34,28 +37,42 @@ public static void main(String[] args) throws Exception { /** * Builds and runs the Hadoop job. - * @return 0 if the Hadoop job completes successfully and 1 otherwise. + * + * @param args command line arguments + * @return 0 if the Hadoop job completes successfully and 1 otherwise. */ @Override - public int run(String[] arg0) throws Exception { + public int run(String[] args) throws Exception { + if (args.length < 2) { + System.err.println("Usage: " + this.getClass().getSimpleName() + " ..."); + return -1; + } + Path outputPath = null; + List inputPaths = new ArrayList<>(); + for (int i = 0; i < args.length; i++) { + if (outputPath == null) { + outputPath = new Path(args[i]); + } else { + inputPaths.add(new Path(args[i])); + } + } + return run(outputPath, inputPaths.toArray(new Path[inputPaths.size()])); + } + + public int run(Path outputPath, Path[] inputPaths) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = getConf(); - // - Job job = new Job(conf); + + Job job = Job.getInstance(conf); job.setJarByClass(WATServerType.class); job.setNumReduceTasks(1); - String inputPath = "data/*.warc.wat.gz"; - //inputPath = "s3://commoncrawl/crawl-data/CC-MAIN-2013-48/segments/1386163035819/wet/CC-MAIN-20131204131715-00000-ip-10-33-133-15.ec2.internal.warc.wet.gz"; - //inputPath = "s3://commoncrawl/crawl-data/CC-MAIN-2013-48/segments/1386163035819/wet/*.warc.wet.gz"; - LOG.info("Input path: " + inputPath); - FileInputFormat.addInputPath(job, new Path(inputPath)); - - String outputPath = "/tmp/cc/"; - FileSystem fs = FileSystem.newInstance(conf); - if (fs.exists(new Path(outputPath))) { - fs.delete(new Path(outputPath), true); + for (int i = 0; i < inputPaths.length; i++) { + LOG.info("Input path: " + inputPaths[i]); + FileInputFormat.addInputPath(job, inputPaths[i]); } - FileOutputFormat.setOutputPath(job, new Path(outputPath)); + + LOG.info("Output path: " + outputPath); + FileOutputFormat.setOutputPath(job, outputPath); job.setInputFormatClass(WARCFileInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); diff --git a/src/org/commoncrawl/examples/mapreduce/WETWordCount.java b/src/org/commoncrawl/examples/mapreduce/WETWordCount.java index db6b91e..69908c5 100644 --- a/src/org/commoncrawl/examples/mapreduce/WETWordCount.java +++ b/src/org/commoncrawl/examples/mapreduce/WETWordCount.java @@ -1,8 +1,11 @@ package org.commoncrawl.examples.mapreduce; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; @@ -17,15 +20,17 @@ import org.commoncrawl.warc.WARCFileInputFormat; /** - * Word count example using the extract text (WET) from the Common Crawl dataset. + * Word count example using the extract text (WET) from the Common Crawl + * dataset. * * @author Stephen Merity (Smerity) */ public class WETWordCount extends Configured implements Tool { private static final Logger LOG = Logger.getLogger(WETWordCount.class); - + /** - * Main entry point that uses the {@link ToolRunner} class to run the Hadoop job. + * Main entry point that uses the {@link ToolRunner} class to run the Hadoop + * job. */ public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new WETWordCount(), args); @@ -34,43 +39,58 @@ public static void main(String[] args) throws Exception { /** * Builds and runs the Hadoop job. - * @return 0 if the Hadoop job completes successfully and 1 otherwise. + * + * @param args command line arguments + * @return 0 if the Hadoop job completes successfully and 1 otherwise. */ @Override - public int run(String[] arg0) throws Exception { + public int run(String[] args) throws Exception { + if (args.length < 2) { + System.err.println("Usage: " + this.getClass().getSimpleName() + " ..."); + return -1; + } + Path outputPath = null; + List inputPaths = new ArrayList<>(); + for (int i = 0; i < args.length; i++) { + if (outputPath == null) { + outputPath = new Path(args[i]); + } else { + inputPaths.add(new Path(args[i])); + } + } + return run(outputPath, inputPaths.toArray(new Path[inputPaths.size()])); + } + + public int run(Path outputPath, Path[] inputPaths) + throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = getConf(); - // - Job job = new Job(conf); + + Job job = Job.getInstance(conf); job.setJarByClass(WETWordCount.class); job.setNumReduceTasks(1); - - String inputPath = "data/*.warc.wet.gz"; - //inputPath = "s3://commoncrawl/crawl-data/CC-MAIN-2013-48/segments/1386163035819/wet/CC-MAIN-20131204131715-00000-ip-10-33-133-15.ec2.internal.warc.wet.gz"; - //inputPath = "s3://commoncrawl/crawl-data/CC-MAIN-2013-48/segments/1386163035819/wet/*.warc.wet.gz"; - LOG.info("Input path: " + inputPath); - FileInputFormat.addInputPath(job, new Path(inputPath)); - - String outputPath = "/tmp/cc/"; - FileSystem fs = FileSystem.newInstance(conf); - if (fs.exists(new Path(outputPath))) { - fs.delete(new Path(outputPath), true); + + for (int i = 0; i < inputPaths.length; i++) { + LOG.info("Input path: " + inputPaths[i]); + FileInputFormat.addInputPath(job, inputPaths[i]); } - FileOutputFormat.setOutputPath(job, new Path(outputPath)); - + + LOG.info("Output path: " + outputPath); + FileOutputFormat.setOutputPath(job, outputPath); + job.setInputFormatClass(WARCFileInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); - + job.setOutputKeyClass(Text.class); - job.setOutputValueClass(LongWritable.class); - - job.setMapperClass(WordCounterMap.WordCountMapper.class); - // The reducer is quite useful in the word frequency task - job.setReducerClass(LongSumReducer.class); - - if (job.waitForCompletion(true)) { - return 0; - } else { - return 1; - } + job.setOutputValueClass(LongWritable.class); + + job.setMapperClass(WordCounterMap.WordCountMapper.class); + // The reducer is quite useful in the word frequency task + job.setReducerClass(LongSumReducer.class); + + if (job.waitForCompletion(true)) { + return 0; + } else { + return 1; + } } } From 4b269de55e4a5f3e8986604c7a234c792a8fe5ee Mon Sep 17 00:00:00 2001 From: Sebastian Nagel Date: Fri, 13 Dec 2024 14:13:47 +0100 Subject: [PATCH 3/5] Improve TagCounterMap - enforce ISO-8859-1 as character set to read HTML code (HTML elements will we readable but text might not) - search for "Content-Type: text/html" also in lower case or any variant --- src/org/commoncrawl/examples/mapreduce/TagCounterMap.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/org/commoncrawl/examples/mapreduce/TagCounterMap.java b/src/org/commoncrawl/examples/mapreduce/TagCounterMap.java index c545259..1fb4290 100644 --- a/src/org/commoncrawl/examples/mapreduce/TagCounterMap.java +++ b/src/org/commoncrawl/examples/mapreduce/TagCounterMap.java @@ -1,6 +1,7 @@ package org.commoncrawl.examples.mapreduce; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -35,18 +36,18 @@ public void map(Text key, ArchiveReader value, Context context) throws IOExcepti for (ArchiveRecord r : value) { try { - LOG.debug(r.getHeader().getUrl() + " -- " + r.available()); + LOG.debug(r.getHeader().getUrl() + " -- " + r.available() + " -- " + r.getHeader().getMimetype()); // We're only interested in processing the responses, not requests or metadata if (r.getHeader().getMimetype().equals("application/http; msgtype=response")) { // Convenience function that reads the full message into a raw byte array byte[] rawData = IOUtils.toByteArray(r, r.available()); - String content = new String(rawData); + String content = new String(rawData, StandardCharsets.ISO_8859_1); // The HTTP header gives us valuable information about what was received during the request String headerText = content.substring(0, content.indexOf("\r\n\r\n")); // In our task, we're only interested in text/html, so we can be a little lax // TODO: Proper HTTP header parsing + don't trust headers - if (headerText.contains("Content-Type: text/html")) { + if (headerText.toLowerCase().contains("content-type: text/html")) { context.getCounter(MAPPERCOUNTER.RECORDS_IN).increment(1); // Only extract the body of the HTTP response when necessary // Due to the way strings work in Java, we don't use any more memory than before From 66bc8bf3642899d921b2ac2c09de895cfd0cf5fa Mon Sep 17 00:00:00 2001 From: Sebastian Nagel Date: Fri, 13 Dec 2024 14:17:23 +0100 Subject: [PATCH 4/5] Adapt WATServerType example to read multi-valued headers in WAT data - header data from WARC and HTTP headers will become multi-valued in WAT files. That is, the value is either a string or a list of strings (JSONArray). - allow for lowercase/uppercase variants of the HTTP "Server" header --- .../examples/mapreduce/ServerTypeMap.java | 33 +++++++++++++++---- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/src/org/commoncrawl/examples/mapreduce/ServerTypeMap.java b/src/org/commoncrawl/examples/mapreduce/ServerTypeMap.java index 8009877..40d717c 100644 --- a/src/org/commoncrawl/examples/mapreduce/ServerTypeMap.java +++ b/src/org/commoncrawl/examples/mapreduce/ServerTypeMap.java @@ -9,6 +9,7 @@ import org.apache.log4j.Logger; import org.archive.io.ArchiveReader; import org.archive.io.ArchiveRecord; +import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; @@ -38,15 +39,33 @@ public void map(Text key, ArchiveReader value, Context context) throws IOExcepti String content = new String(rawData); JSONObject json = new JSONObject(content); try { - String server = json.getJSONObject("Envelope").getJSONObject("Payload-Metadata").getJSONObject("HTTP-Response-Metadata").getJSONObject("Headers").getString("Server"); - outKey.set(server); - context.write(outKey, outVal); + String warcType = json.getJSONObject("Envelope").getJSONObject("WARC-Header-Metadata") + .getString("WARC-Type"); + if (!warcType.equals("response")) { + continue; + } + JSONObject httpHeaders = json.getJSONObject("Envelope").getJSONObject("Payload-Metadata") + .getJSONObject("HTTP-Response-Metadata").getJSONObject("Headers"); + JSONArray httpHeaderNames = httpHeaders.names(); + for (int i = 0, l = httpHeaders.length(); i < l; i++) { + String headerName = httpHeaderNames.getString(i); + if (headerName.equalsIgnoreCase("server")) { + Object headerValue = httpHeaders.get(headerName); + if (headerValue instanceof JSONArray) { + for (int j = 0, L = ((JSONArray) headerValue).length(); j < L; j++) { + outKey.set(((JSONArray) headerValue).getString(j)); + context.write(outKey, outVal); + } + } else { + outKey.set(headerValue.toString()); + context.write(outKey, outVal); + } + } + } } catch (JSONException ex) { - // If we reach here, the JSON object didn't have the header we were looking for - // There are likely better ways to check for json["Envelope"]["Payload-Metadata"][...] but this is concise + LOG.error("Failed to get HTTP header \"Server\" for " + r.getHeader().getUrl(), ex); } - } - catch (Exception ex) { + } catch (Exception ex) { LOG.error("Caught Exception", ex); context.getCounter(MAPPERCOUNTER.EXCEPTIONS).increment(1); } From 20eca181c2bf388f992046496f3365f101c7226a Mon Sep 17 00:00:00 2001 From: Sebastian Nagel Date: Fri, 13 Dec 2024 15:00:38 +0100 Subject: [PATCH 5/5] Adapt WATSampleOutlinks job to read multi-valued headers in WAT data - ensure that the "nofollow" attributes in "X-Robots-Tag" HTTP headers are properly read independent from wether the header - is repeated (multi-valued) - written in any lower/uppercase variants - improve logging in case of errors --- .../examples/mapreduce/WATSampleOutLinks.java | 39 +++++++++++++------ 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/src/org/commoncrawl/examples/mapreduce/WATSampleOutLinks.java b/src/org/commoncrawl/examples/mapreduce/WATSampleOutLinks.java index caf2ffc..6690b21 100644 --- a/src/org/commoncrawl/examples/mapreduce/WATSampleOutLinks.java +++ b/src/org/commoncrawl/examples/mapreduce/WATSampleOutLinks.java @@ -143,12 +143,29 @@ public void map(Text key, ArchiveReader value, Context context) throws IOExcepti for (int i = 0, l = httpHeaders.length(); i < l; i++) { String headerName = httpHeaderNames.getString(i); if (headerName.equalsIgnoreCase("x-robots-tag")) { - String headerValue = httpHeaders.getString(headerName); - if (nofollowPattern.matcher(headerValue).find()) { - context.getCounter(COUNTER.RECORDS_NOFOLLOW_X_ROBOTS_SKIPPED).increment(1); - continue record; + Object headerValue = httpHeaders.get(headerName); + if (headerValue instanceof String) { + if (nofollowPattern.matcher((String) headerValue).find()) { + context.getCounter(COUNTER.RECORDS_NOFOLLOW_X_ROBOTS_SKIPPED).increment(1); + continue record; + } + } else if (headerValue instanceof JSONArray) { + for (int j = 0, L = ((JSONArray) headerValue).length(); j < L; j++) { + if (nofollowPattern.matcher(((JSONArray) headerValue).getString(j)) + .find()) { + context.getCounter(COUNTER.RECORDS_NOFOLLOW_X_ROBOTS_SKIPPED) + .increment(1); + continue record; + } + } + } else { + LOG.error("Unexpected JSON value type when processing X-Robots-Tag: " + + headerValue.getClass().getName()); } - break; // no need to iterate over further HTTP headers + /* + * Note: continue to iterate over all HTTP headers because there might be + * variants (lower/upper case) of the "X-Robots-Tag" header + */ } } } @@ -226,17 +243,17 @@ public void map(Text key, ArchiveReader value, Context context) throws IOExcepti context.getCounter(COUNTER.LINKS_PAGE_UNIQ_ACCEPTED).increment(n); } catch (JSONException ex) { context.getCounter(COUNTER.EXCEPTIONS_JSON).increment(1); - LOG.error("Caught JSONException", ex); + LOG.error("Caught JSONException while processing record for " + r.getHeader().getUrl(), ex); } catch (MalformedURLException ex) { - LOG.error("Caught MalformedURLException", ex); + LOG.error("Caught MalformedURLException while processing record for " + r.getHeader().getUrl(), + ex); context.getCounter(COUNTER.EXCEPTIONS_URL_MALFORMED).increment(1); } catch (Exception ex) { context.getCounter(COUNTER.EXCEPTIONS).increment(1); - LOG.error("Caught Exception", ex); + LOG.error("Caught Exception while processing record for " + r.getHeader().getUrl(), ex); } - } - catch (Exception ex) { - LOG.error("Caught Exception", ex); + } catch (Exception ex) { + LOG.error("Caught Exception while processing record for " + r.getHeader().getUrl(), ex); context.getCounter(COUNTER.EXCEPTIONS).increment(1); } }