Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ There are three examples for Hadoop processing:
+ [WAT files] Server response analysis using response metadata
+ [WET files] Classic word count example using extracted text

All three assume initially that the files are stored locally in the data subdirectory but can be trivially modified to pull them down from Common Crawl's Amazon S3 bucket. To acquire the files, you can use any HTTP client or (if you are on AWS) the [AWS CLI](https://aws.amazon.com/cli/).
For development, you likely want to start with input files stored locally in the `data/` subdirectory. To acquire the files, you can use any HTTP client or (if you are on AWS) the [AWS CLI](https://aws.amazon.com/cli/).

mkdir data
cd data/
Expand All @@ -23,12 +23,12 @@ or on AWS
aws s3 cp s3://commoncrawl/crawl-data/CC-MAIN-2013-48/segments/1386163035819/warc/CC-MAIN-20131204131715-00000-ip-10-33-133-15.ec2.internal.warc.gz data/
aws s3 cp s3://commoncrawl/crawl-data/CC-MAIN-2013-48/segments/1386163035819/wet/CC-MAIN-20131204131715-00000-ip-10-33-133-15.ec2.internal.warc.wet.gz data/

To build and run:
To build and run in [Hadoop local or non-distributed mode](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SingleCluster.html#Standalone_Operation):

mvn package
<path-to-hadoop>/bin/hadoop jar target/cc-warc-examples-0.3-SNAPSHOT-jar-with-dependencies.jar org.commoncrawl.examples.mapreduce.WETWordCount
<path-to-hadoop>/bin/hadoop jar target/cc-warc-examples-0.5-SNAPSHOT-jar-with-dependencies.jar org.commoncrawl.examples.mapreduce.WETWordCount -Dmapreduce.framework.name=local file:/tmp/cc/wet-word-count file:$PWD/data/*.wet.gz

All three examples place output in the directory `/tmp/cc`.
Note: all three examples require that you specify the output directory and all input files or directories.

# License

Expand Down
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

<groupId>org.commoncrawl</groupId>
<artifactId>cc-warc-examples</artifactId>
<version>0.4-SNAPSHOT</version>
<version>0.5-SNAPSHOT</version>
<packaging>jar</packaging>

<name>cc-warc-examples</name>
Expand Down Expand Up @@ -55,7 +55,7 @@
<dependency>
<groupId>org.netpreserve.commons</groupId>
<artifactId>webarchive-commons</artifactId>
<version>1.1.9</version>
<version>1.2.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
Expand All @@ -67,7 +67,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.4</version>
<version>3.3.6</version>
<scope>provided</scope>
</dependency>
<dependency>
Expand Down
33 changes: 26 additions & 7 deletions src/org/commoncrawl/examples/mapreduce/ServerTypeMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.log4j.Logger;
import org.archive.io.ArchiveReader;
import org.archive.io.ArchiveRecord;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;

Expand Down Expand Up @@ -38,15 +39,33 @@ public void map(Text key, ArchiveReader value, Context context) throws IOExcepti
String content = new String(rawData);
JSONObject json = new JSONObject(content);
try {
String server = json.getJSONObject("Envelope").getJSONObject("Payload-Metadata").getJSONObject("HTTP-Response-Metadata").getJSONObject("Headers").getString("Server");
outKey.set(server);
context.write(outKey, outVal);
String warcType = json.getJSONObject("Envelope").getJSONObject("WARC-Header-Metadata")
.getString("WARC-Type");
if (!warcType.equals("response")) {
continue;
}
JSONObject httpHeaders = json.getJSONObject("Envelope").getJSONObject("Payload-Metadata")
.getJSONObject("HTTP-Response-Metadata").getJSONObject("Headers");
JSONArray httpHeaderNames = httpHeaders.names();
for (int i = 0, l = httpHeaders.length(); i < l; i++) {
String headerName = httpHeaderNames.getString(i);
if (headerName.equalsIgnoreCase("server")) {
Object headerValue = httpHeaders.get(headerName);
if (headerValue instanceof JSONArray) {
for (int j = 0, L = ((JSONArray) headerValue).length(); j < L; j++) {
outKey.set(((JSONArray) headerValue).getString(j));
context.write(outKey, outVal);
}
} else {
outKey.set(headerValue.toString());
context.write(outKey, outVal);
}
}
}
} catch (JSONException ex) {
// If we reach here, the JSON object didn't have the header we were looking for
// There are likely better ways to check for json["Envelope"]["Payload-Metadata"][...] but this is concise
LOG.error("Failed to get HTTP header \"Server\" for " + r.getHeader().getUrl(), ex);
}
}
catch (Exception ex) {
} catch (Exception ex) {
LOG.error("Caught Exception", ex);
context.getCounter(MAPPERCOUNTER.EXCEPTIONS).increment(1);
}
Expand Down
7 changes: 4 additions & 3 deletions src/org/commoncrawl/examples/mapreduce/TagCounterMap.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.commoncrawl.examples.mapreduce;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -35,18 +36,18 @@ public void map(Text key, ArchiveReader value, Context context) throws IOExcepti

for (ArchiveRecord r : value) {
try {
LOG.debug(r.getHeader().getUrl() + " -- " + r.available());
LOG.debug(r.getHeader().getUrl() + " -- " + r.available() + " -- " + r.getHeader().getMimetype());
// We're only interested in processing the responses, not requests or metadata
if (r.getHeader().getMimetype().equals("application/http; msgtype=response")) {
// Convenience function that reads the full message into a raw byte array
byte[] rawData = IOUtils.toByteArray(r, r.available());
String content = new String(rawData);
String content = new String(rawData, StandardCharsets.ISO_8859_1);
// The HTTP header gives us valuable information about what was received during the request
String headerText = content.substring(0, content.indexOf("\r\n\r\n"));

// In our task, we're only interested in text/html, so we can be a little lax
// TODO: Proper HTTP header parsing + don't trust headers
if (headerText.contains("Content-Type: text/html")) {
if (headerText.toLowerCase().contains("content-type: text/html")) {
context.getCounter(MAPPERCOUNTER.RECORDS_IN).increment(1);
// Only extract the body of the HTTP response when necessary
// Due to the way strings work in Java, we don't use any more memory than before
Expand Down
50 changes: 34 additions & 16 deletions src/org/commoncrawl/examples/mapreduce/WARCTagCounter.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package org.commoncrawl.examples.mapreduce;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
Expand Down Expand Up @@ -34,28 +37,43 @@ public static void main(String[] args) throws Exception {

/**
* Builds and runs the Hadoop job.
* @return 0 if the Hadoop job completes successfully and 1 otherwise.
*
* @param args command line arguments
* @return 0 if the Hadoop job completes successfully and 1 otherwise.
*/
@Override
public int run(String[] arg0) throws Exception {
public int run(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: " + this.getClass().getSimpleName() + " <outputpath> <inputpath>...");
return -1;
}
Path outputPath = null;
List<Path> inputPaths = new ArrayList<>();
for (int i = 0; i < args.length; i++) {
if (outputPath == null) {
outputPath = new Path(args[i]);
} else {
inputPaths.add(new Path(args[i]));
}
}
return run(outputPath, inputPaths.toArray(new Path[inputPaths.size()]));
}

public int run(Path outputPath, Path[] inputPaths)
throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = getConf();
//
Job job = new Job(conf);

Job job = Job.getInstance(conf);
job.setJarByClass(WARCTagCounter.class);
job.setNumReduceTasks(1);

String inputPath = "data/*.warc.gz";
//inputPath = "s3://commoncrawl/crawl-data/CC-MAIN-2013-48/segments/1386163035819/wet/CC-MAIN-20131204131715-00000-ip-10-33-133-15.ec2.internal.warc.wet.gz";
//inputPath = "s3://commoncrawl/crawl-data/CC-MAIN-2013-48/segments/1386163035819/wet/*.warc.wet.gz";
LOG.info("Input path: " + inputPath);
FileInputFormat.addInputPath(job, new Path(inputPath));

String outputPath = "/tmp/cc/";
FileSystem fs = FileSystem.newInstance(conf);
if (fs.exists(new Path(outputPath))) {
fs.delete(new Path(outputPath), true);
for (int i = 0; i < inputPaths.length; i++) {
LOG.info("Input path: " + inputPaths[i]);
FileInputFormat.addInputPath(job, inputPaths[i]);
}
FileOutputFormat.setOutputPath(job, new Path(outputPath));

LOG.info("Output path: " + outputPath);
FileOutputFormat.setOutputPath(job, outputPath);

job.setInputFormatClass(WARCFileInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
Expand Down
39 changes: 28 additions & 11 deletions src/org/commoncrawl/examples/mapreduce/WATSampleOutLinks.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,29 @@ public void map(Text key, ArchiveReader value, Context context) throws IOExcepti
for (int i = 0, l = httpHeaders.length(); i < l; i++) {
String headerName = httpHeaderNames.getString(i);
if (headerName.equalsIgnoreCase("x-robots-tag")) {
String headerValue = httpHeaders.getString(headerName);
if (nofollowPattern.matcher(headerValue).find()) {
context.getCounter(COUNTER.RECORDS_NOFOLLOW_X_ROBOTS_SKIPPED).increment(1);
continue record;
Object headerValue = httpHeaders.get(headerName);
if (headerValue instanceof String) {
if (nofollowPattern.matcher((String) headerValue).find()) {
context.getCounter(COUNTER.RECORDS_NOFOLLOW_X_ROBOTS_SKIPPED).increment(1);
continue record;
}
} else if (headerValue instanceof JSONArray) {
for (int j = 0, L = ((JSONArray) headerValue).length(); j < L; j++) {
if (nofollowPattern.matcher(((JSONArray) headerValue).getString(j))
.find()) {
context.getCounter(COUNTER.RECORDS_NOFOLLOW_X_ROBOTS_SKIPPED)
.increment(1);
continue record;
}
}
} else {
LOG.error("Unexpected JSON value type when processing X-Robots-Tag: "
+ headerValue.getClass().getName());
}
break; // no need to iterate over further HTTP headers
/*
* Note: continue to iterate over all HTTP headers because there might be
* variants (lower/upper case) of the "X-Robots-Tag" header
*/
}
}
}
Expand Down Expand Up @@ -226,17 +243,17 @@ public void map(Text key, ArchiveReader value, Context context) throws IOExcepti
context.getCounter(COUNTER.LINKS_PAGE_UNIQ_ACCEPTED).increment(n);
} catch (JSONException ex) {
context.getCounter(COUNTER.EXCEPTIONS_JSON).increment(1);
LOG.error("Caught JSONException", ex);
LOG.error("Caught JSONException while processing record for " + r.getHeader().getUrl(), ex);
} catch (MalformedURLException ex) {
LOG.error("Caught MalformedURLException", ex);
LOG.error("Caught MalformedURLException while processing record for " + r.getHeader().getUrl(),
ex);
context.getCounter(COUNTER.EXCEPTIONS_URL_MALFORMED).increment(1);
} catch (Exception ex) {
context.getCounter(COUNTER.EXCEPTIONS).increment(1);
LOG.error("Caught Exception", ex);
LOG.error("Caught Exception while processing record for " + r.getHeader().getUrl(), ex);
}
}
catch (Exception ex) {
LOG.error("Caught Exception", ex);
} catch (Exception ex) {
LOG.error("Caught Exception while processing record for " + r.getHeader().getUrl(), ex);
context.getCounter(COUNTER.EXCEPTIONS).increment(1);
}
}
Expand Down
49 changes: 33 additions & 16 deletions src/org/commoncrawl/examples/mapreduce/WATServerType.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package org.commoncrawl.examples.mapreduce;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
Expand Down Expand Up @@ -34,28 +37,42 @@ public static void main(String[] args) throws Exception {

/**
* Builds and runs the Hadoop job.
* @return 0 if the Hadoop job completes successfully and 1 otherwise.
*
* @param args command line arguments
* @return 0 if the Hadoop job completes successfully and 1 otherwise.
*/
@Override
public int run(String[] arg0) throws Exception {
public int run(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: " + this.getClass().getSimpleName() + " <outputpath> <inputpath>...");
return -1;
}
Path outputPath = null;
List<Path> inputPaths = new ArrayList<>();
for (int i = 0; i < args.length; i++) {
if (outputPath == null) {
outputPath = new Path(args[i]);
} else {
inputPaths.add(new Path(args[i]));
}
}
return run(outputPath, inputPaths.toArray(new Path[inputPaths.size()]));
}

public int run(Path outputPath, Path[] inputPaths) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = getConf();
//
Job job = new Job(conf);

Job job = Job.getInstance(conf);
job.setJarByClass(WATServerType.class);
job.setNumReduceTasks(1);

String inputPath = "data/*.warc.wat.gz";
//inputPath = "s3://commoncrawl/crawl-data/CC-MAIN-2013-48/segments/1386163035819/wet/CC-MAIN-20131204131715-00000-ip-10-33-133-15.ec2.internal.warc.wet.gz";
//inputPath = "s3://commoncrawl/crawl-data/CC-MAIN-2013-48/segments/1386163035819/wet/*.warc.wet.gz";
LOG.info("Input path: " + inputPath);
FileInputFormat.addInputPath(job, new Path(inputPath));

String outputPath = "/tmp/cc/";
FileSystem fs = FileSystem.newInstance(conf);
if (fs.exists(new Path(outputPath))) {
fs.delete(new Path(outputPath), true);
for (int i = 0; i < inputPaths.length; i++) {
LOG.info("Input path: " + inputPaths[i]);
FileInputFormat.addInputPath(job, inputPaths[i]);
}
FileOutputFormat.setOutputPath(job, new Path(outputPath));

LOG.info("Output path: " + outputPath);
FileOutputFormat.setOutputPath(job, outputPath);

job.setInputFormatClass(WARCFileInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
Expand Down
Loading