diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 0000000..bdbf1f6 --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,29 @@ +name: cc-warc-examples build + +on: + push: + branches: + - master + pull_request: + branches: + - master + +jobs: + build: + runs-on: ubuntu-latest + strategy: + matrix: + java: [ 11, 17, 21 ] + name: Java ${{ matrix.java }} + steps: + - uses: actions/checkout@v6 + + - name: Setup JDK + uses: actions/setup-java@v5 + with: + distribution: 'temurin' + java-version: ${{ matrix.java }} + cache: 'maven' + + - name: Build + run: mvn verify javadoc:aggregate diff --git a/.gitignore b/.gitignore index c84686a..04bcd36 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,7 @@ data/ bin/ +/target/ +/.classpath +/.project +/.settings diff --git a/README.md b/README.md index b9591d8..74345fd 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -![Common Crawl Logo](http://commoncrawl.org/wp-content/uploads/2012/04/ccLogo.png) +![Common Crawl Logo](https://avatars.githubusercontent.com/u/1194841?s=64) # Common Crawl WARC Examples @@ -10,11 +10,26 @@ There are three examples for Hadoop processing: + [WAT files] Server response analysis using response metadata + [WET files] Classic word count example using extracted text -All three assume initially that the files are stored locally but can be trivially modified to pull them down from Common Crawl's Amazon S3 bucket. -To acquire the files, you can use [S3Cmd](http://s3tools.org/s3cmd) or similar. +For development, you likely want to start with input files stored locally in the `data/` subdirectory. To acquire the files, you can use any HTTP client or (if you are on AWS) the [AWS CLI](https://aws.amazon.com/cli/). - s3cmd get s3://aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2013-48/segments/1386163035819/warc/CC-MAIN-20131204131715-00000-ip-10-33-133-15.ec2.internal.warc.gz + mkdir data + cd data/ + wget https://data.commoncrawl.org/crawl-data/CC-MAIN-2013-48/segments/1386163035819/warc/CC-MAIN-20131204131715-00000-ip-10-33-133-15.ec2.internal.warc.gz + wget https://data.commoncrawl.org/crawl-data/CC-MAIN-2013-48/segments/1386163035819/wet/CC-MAIN-20131204131715-00000-ip-10-33-133-15.ec2.internal.warc.wet.gz +or on AWS + + mkdir data + aws s3 cp s3://commoncrawl/crawl-data/CC-MAIN-2013-48/segments/1386163035819/warc/CC-MAIN-20131204131715-00000-ip-10-33-133-15.ec2.internal.warc.gz data/ + aws s3 cp s3://commoncrawl/crawl-data/CC-MAIN-2013-48/segments/1386163035819/wet/CC-MAIN-20131204131715-00000-ip-10-33-133-15.ec2.internal.warc.wet.gz data/ + +To build and run in [Hadoop local or non-distributed mode](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SingleCluster.html#Standalone_Operation): + + mvn package + /bin/hadoop jar target/cc-warc-examples-0.5-SNAPSHOT-jar-with-dependencies.jar org.commoncrawl.examples.mapreduce.WETWordCount -Dmapreduce.framework.name=local file:/tmp/cc/wet-word-count file:$PWD/data/*.wet.gz + +Note: all three examples require that you specify the output directory and all input files or directories. + # License MIT License, as per `LICENSE` diff --git a/eclipse-formatter.xml b/eclipse-formatter.xml new file mode 100644 index 0000000..e9ac2f0 --- /dev/null +++ b/eclipse-formatter.xml @@ -0,0 +1,404 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/lib/webarchive-commons-jar-with-dependencies.jar b/lib/webarchive-commons-jar-with-dependencies.jar deleted file mode 100644 index f1766aa..0000000 Binary files a/lib/webarchive-commons-jar-with-dependencies.jar and /dev/null differ diff --git a/pom.xml b/pom.xml index 97fdf14..a40d4dc 100644 --- a/pom.xml +++ b/pom.xml @@ -1,207 +1,213 @@ - - 4.0.0 + + + 4.0.0 - org.commoncrawl - cc-warc-examples - 0.1-SNAPSHOT - jar + org.commoncrawl + cc-warc-examples + 0.6-SNAPSHOT + jar - cc-warc-examples - - Common Crawl WARC Examples. - Contains both wrappers for processing WARC files in Hadoop MapReduce jobs and Hadoop examples to get you started. - - https://github.com/Smerity/cc-warc-examples + cc-warc-examples + Common Crawl WARC Examples. + Contains both wrappers for processing WARC files in Hadoop MapReduce jobs and Hadoop examples to get you started. + https://github.com/commoncrawl/cc-warc-examples - - - The MIT License - http://www.opensource.org/licenses/mit-license.php - repo - - + + + The MIT License + http://www.opensource.org/licenses/mit-license.php + repo + + - - scm:git:git@github.com:Smerity/cc-warc-examples.git - git@github.com:Smerity/cc-warc-examples.git - - - - UTF-8 - ${maven.build.timestamp} - yyyyMMddhhmmss - - - sonatype-nexus-staging - https://oss.sonatype.org/service/local/staging/deploy/maven2/ - sonatype-nexus-snapshots - https://oss.sonatype.org/content/repositories/snapshots/ - + + scm:git:git@github.com:commoncrawl/cc-warc-examples.git + git@github.com:commoncrawl/cc-warc-examples.git + + + UTF-8 + ${maven.build.timestamp} + yyyyMMddhhmmss + + + sonatype-nexus-staging + https://oss.sonatype.org/service/local/staging/deploy/maven2/ + sonatype-nexus-snapshots + https://oss.sonatype.org/content/repositories/snapshots/ + - - - log4j - log4j - 1.2.17 - + + + log4j + log4j + 1.2.17 + - - commons-io - commons-io - 2.4 - + + commons-io + commons-io + 2.11.0 + - - org.netpreserve.commons - webarchive-commons - 1.1.2 - - - - org.apache.hadoop - hadoop-core - 0.20.2-cdh3u4 - - + + org.netpreserve.commons + webarchive-commons + 3.0.2 + + + org.apache.hadoop + hadoop-core + + + - - src - - - - org.apache.maven.plugins - maven-compiler-plugin - 2.3.2 - - 1.5 - 1.5 - - - - maven-assembly-plugin - 2.4 - - - jar-with-dependencies - - cc-warc-examples-${project.version} - - - - package - - single - - - - - - maven-javadoc-plugin - 2.7 - - true - .svn - UTF-8 - UTF-8 - - - - - jar - javadoc - - - - site - pre-site - - javadoc - - - - - - maven-source-plugin - 2.1.1 - - - - jar - - - - - - org.apache.maven.plugins - maven-release-plugin - 2.2.2 - - - + + org.apache.hadoop + hadoop-client + 3.3.6 + provided + + + net.java.dev.jets3t + jets3t + 0.9.4 + + - - - release-sign-artifacts - - - performRelease - true - - - - - - org.apache.maven.plugins - maven-gpg-plugin - 1.4 - - ${gpg.passphrase} - - - - sign-artifacts - verify - - sign - - - - - - - - + - - - cloudera - Cloudera Hadoop - https://repository.cloudera.com/artifactory/cloudera-repos/ - default + + + org.apache.maven.plugins + maven-compiler-plugin + 3.15.0 + + 11 + 11 + + + + maven-assembly-plugin + 3.8.0 + + + jar-with-dependencies + + cc-warc-examples-${project.version} + + + + + single + + package + + + + + maven-javadoc-plugin + 3.12.0 + + true + .svn + UTF-8 + + + + + jar + javadoc + + + + site + + javadoc + + pre-site + + + + + maven-source-plugin + 3.4.0 + + + + jar + + + + + + org.apache.maven.plugins + maven-release-plugin + 3.3.1 + + + com.diffplug.spotless + spotless-maven-plugin + 2.46.1 + + + + + pom.xml + + + all + true + false + -1 + recommended_2008_06 + + + + + ${project.basedir}/eclipse-formatter.xml + + + + + + src + + + + + release-sign-artifacts + + + performRelease + true + + + + + + org.apache.maven.plugins + maven-gpg-plugin + 3.2.8 + + ${gpg.passphrase} + + + + sign-artifacts + + sign + + verify + + + + + + + - - true - daily - warn - - - true - daily - warn - - - - - - - ${repository.id} - ${repository.url} - - - ${snapshotRepository.id} - ${snapshotRepository.url} - - - diff --git a/src/org/commoncrawl/examples/S3ReaderTest.java b/src/org/commoncrawl/examples/S3ReaderTest.java index fefa4ce..1355573 100644 --- a/src/org/commoncrawl/examples/S3ReaderTest.java +++ b/src/org/commoncrawl/examples/S3ReaderTest.java @@ -1,54 +1,56 @@ package org.commoncrawl.examples; + import java.io.IOException; import org.archive.io.ArchiveReader; import org.archive.io.ArchiveRecord; import org.archive.io.warc.WARCReaderFactory; import org.jets3t.service.S3Service; -import org.jets3t.service.S3ServiceException; +import org.jets3t.service.ServiceException; import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.jets3t.service.model.S3Object; /** - * This is a raw example of how you can retrieve a file from the - * Common Crawl S3 bucket without credentials using JetS3t. + * This is a raw example of how you can retrieve a file from the Common Crawl S3 bucket without + * credentials using JetS3t. * * @author Stephen Merity (Smerity) */ public class S3ReaderTest { - public static void main(String[] args) throws IOException, S3ServiceException { + public static void main(String[] args) throws IOException, ServiceException { // We're accessing a publicly available bucket so don't need to fill in our credentials S3Service s3s = new RestS3Service(null); - + // Let's grab a file out of the CommonCrawl S3 bucket - String fn = "common-crawl/crawl-data/CC-MAIN-2013-48/segments/1386163035819/warc/CC-MAIN-20131204131715-00000-ip-10-33-133-15.ec2.internal.warc.gz"; - S3Object f = s3s.getObject("aws-publicdatasets", fn, null, null, null, null, null, null); - + String fn = "crawl-data/CC-MAIN-2013-48/segments/1386163035819/warc/CC-MAIN-20131204131715-00000-ip-10-33-133-15.ec2.internal.warc.gz"; + S3Object f = s3s.getObject("commoncrawl", fn, null, null, null, null, null, null); + // The file name identifies the ArchiveReader and indicates if it should be decompressed ArchiveReader ar = WARCReaderFactory.get(fn, f.getDataInputStream(), true); - + // Once we have an ArchiveReader, we can work through each of the records it contains int i = 0; - for(ArchiveRecord r : ar) { + for (ArchiveRecord r : ar) { // The header file contains information such as the type of record, size, creation time, and URL System.out.println("Header: " + r.getHeader()); System.out.println("URL: " + r.getHeader().getUrl()); System.out.println(); - + // If we want to read the contents of the record, we can use the ArchiveRecord as an InputStream // Create a byte array that is as long as all the record's stated length byte[] rawData = new byte[r.available()]; r.read(rawData); // Note: potential optimization would be to have a large buffer only allocated once - + // Why don't we convert it to a string and print the start of it? Let's hope it's text! String content = new String(rawData); System.out.println(content.substring(0, Math.min(500, content.length()))); System.out.println((content.length() > 500 ? "..." : "")); - - // Pretty printing to make the output more readable + + // Pretty printing to make the output more readable System.out.println("=-=-=-=-=-=-=-=-="); - if (i++ > 4) break; + if (i++ > 4) + break; } } } \ No newline at end of file diff --git a/src/org/commoncrawl/examples/WARCReaderTest.java b/src/org/commoncrawl/examples/WARCReaderTest.java index ea18f6a..972e9ea 100644 --- a/src/org/commoncrawl/examples/WARCReaderTest.java +++ b/src/org/commoncrawl/examples/WARCReaderTest.java @@ -1,4 +1,5 @@ package org.commoncrawl.examples; + import java.io.FileInputStream; import java.io.IOException; @@ -8,43 +9,44 @@ import org.archive.io.warc.WARCReaderFactory; /** - * A raw example of how to process a WARC file using the org.archive.io package. - * Common Crawl S3 bucket without credentials using JetS3t. + * A raw example of how to process a WARC file using the org.archive.io package. Common Crawl S3 + * bucket without credentials using JetS3t. * * @author Stephen Merity (Smerity) */ public class WARCReaderTest { /** * @param args - * @throws IOException + * @throws IOException */ public static void main(String[] args) throws IOException { - // Set up a local compressed WARC file for reading + // Set up a local compressed WARC file for reading String fn = "data/CC-MAIN-20131204131715-00000-ip-10-33-133-15.ec2.internal.warc.gz"; FileInputStream is = new FileInputStream(fn); // The file name identifies the ArchiveReader and indicates if it should be decompressed ArchiveReader ar = WARCReaderFactory.get(fn, is, true); - + // Once we have an ArchiveReader, we can work through each of the records it contains int i = 0; - for(ArchiveRecord r : ar) { + for (ArchiveRecord r : ar) { // The header file contains information such as the type of record, size, creation time, and URL System.out.println(r.getHeader()); System.out.println(r.getHeader().getUrl()); System.out.println(); - + // If we want to read the contents of the record, we can use the ArchiveRecord as an InputStream // Create a byte array that is as long as the record's stated length byte[] rawData = IOUtils.toByteArray(r, r.available()); - + // Why don't we convert it to a string and print the start of it? Let's hope it's text! String content = new String(rawData); System.out.println(content.substring(0, Math.min(500, content.length()))); System.out.println((content.length() > 500 ? "..." : "")); - - // Pretty printing to make the output more readable + + // Pretty printing to make the output more readable System.out.println("=-=-=-=-=-=-=-=-="); - if (i++ > 4) break; + if (i++ > 4) + break; } } } \ No newline at end of file diff --git a/src/org/commoncrawl/examples/mapreduce/ServerTypeMap.java b/src/org/commoncrawl/examples/mapreduce/ServerTypeMap.java index 8009877..c290150 100644 --- a/src/org/commoncrawl/examples/mapreduce/ServerTypeMap.java +++ b/src/org/commoncrawl/examples/mapreduce/ServerTypeMap.java @@ -9,15 +9,17 @@ import org.apache.log4j.Logger; import org.archive.io.ArchiveReader; import org.archive.io.ArchiveRecord; +import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; public class ServerTypeMap { private static final Logger LOG = Logger.getLogger(ServerTypeMap.class); + protected static enum MAPPERCOUNTER { - RECORDS_IN, - NO_SERVER, - EXCEPTIONS + RECORDS_IN, // + NO_SERVER, // + EXCEPTIONS // } protected static class ServerMapper extends Mapper { @@ -38,15 +40,36 @@ public void map(Text key, ArchiveReader value, Context context) throws IOExcepti String content = new String(rawData); JSONObject json = new JSONObject(content); try { - String server = json.getJSONObject("Envelope").getJSONObject("Payload-Metadata").getJSONObject("HTTP-Response-Metadata").getJSONObject("Headers").getString("Server"); - outKey.set(server); - context.write(outKey, outVal); + String warcType = json.getJSONObject("Envelope") + .getJSONObject("WARC-Header-Metadata") + .getString("WARC-Type"); + if (!warcType.equals("response")) { + continue; + } + JSONObject httpHeaders = json.getJSONObject("Envelope") + .getJSONObject("Payload-Metadata") + .getJSONObject("HTTP-Response-Metadata") + .getJSONObject("Headers"); + JSONArray httpHeaderNames = httpHeaders.names(); + for (int i = 0, l = httpHeaders.length(); i < l; i++) { + String headerName = httpHeaderNames.getString(i); + if (headerName.equalsIgnoreCase("server")) { + Object headerValue = httpHeaders.get(headerName); + if (headerValue instanceof JSONArray) { + for (int j = 0, L = ((JSONArray) headerValue).length(); j < L; j++) { + outKey.set(((JSONArray) headerValue).getString(j)); + context.write(outKey, outVal); + } + } else { + outKey.set(headerValue.toString()); + context.write(outKey, outVal); + } + } + } } catch (JSONException ex) { - // If we reach here, the JSON object didn't have the header we were looking for - // There are likely better ways to check for json["Envelope"]["Payload-Metadata"][...] but this is concise + LOG.error("Failed to get HTTP header \"Server\" for " + r.getHeader().getUrl(), ex); } - } - catch (Exception ex) { + } catch (Exception ex) { LOG.error("Caught Exception", ex); context.getCounter(MAPPERCOUNTER.EXCEPTIONS).increment(1); } diff --git a/src/org/commoncrawl/examples/mapreduce/TagCounterMap.java b/src/org/commoncrawl/examples/mapreduce/TagCounterMap.java index c545259..cc5c663 100644 --- a/src/org/commoncrawl/examples/mapreduce/TagCounterMap.java +++ b/src/org/commoncrawl/examples/mapreduce/TagCounterMap.java @@ -1,6 +1,7 @@ package org.commoncrawl.examples.mapreduce; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -14,9 +15,10 @@ public class TagCounterMap { private static final Logger LOG = Logger.getLogger(TagCounterMap.class); + protected static enum MAPPERCOUNTER { - RECORDS_IN, - EXCEPTIONS + RECORDS_IN, // + EXCEPTIONS // } protected static class TagCounterMapper extends Mapper { @@ -32,21 +34,21 @@ protected static class TagCounterMapper extends Mapper ..."); + return -1; + } + Path outputPath = null; + List inputPaths = new ArrayList<>(); + for (int i = 0; i < args.length; i++) { + if (outputPath == null) { + outputPath = new Path(args[i]); + } else { + inputPaths.add(new Path(args[i])); + } + } + return run(outputPath, inputPaths.toArray(new Path[inputPaths.size()])); + } + + public int run(Path outputPath, Path[] inputPaths) + throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = getConf(); - // - Job job = new Job(conf); + + Job job = Job.getInstance(conf); job.setJarByClass(WARCTagCounter.class); job.setNumReduceTasks(1); - - String inputPath = "data/*.warc.gz"; - //inputPath = "s3n://aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2013-48/segments/1386163035819/wet/CC-MAIN-20131204131715-00000-ip-10-33-133-15.ec2.internal.warc.wet.gz"; - //inputPath = "s3n://aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2013-48/segments/1386163035819/wet/*.warc.wet.gz"; - LOG.info("Input path: " + inputPath); - FileInputFormat.addInputPath(job, new Path(inputPath)); - - String outputPath = "/tmp/cc/"; - FileSystem fs = FileSystem.newInstance(conf); - if (fs.exists(new Path(outputPath))) { - fs.delete(new Path(outputPath), true); + + for (int i = 0; i < inputPaths.length; i++) { + LOG.info("Input path: " + inputPaths[i]); + FileInputFormat.addInputPath(job, inputPaths[i]); } - FileOutputFormat.setOutputPath(job, new Path(outputPath)); + + LOG.info("Output path: " + outputPath); + FileOutputFormat.setOutputPath(job, outputPath); job.setInputFormatClass(WARCFileInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); - job.setOutputValueClass(LongWritable.class); - - job.setMapperClass(TagCounterMap.TagCounterMapper.class); - job.setReducerClass(LongSumReducer.class); + job.setOutputValueClass(LongWritable.class); + + job.setMapperClass(TagCounterMap.TagCounterMapper.class); + job.setReducerClass(LongSumReducer.class); - return job.waitForCompletion(true) ? 0 : -1; + return job.waitForCompletion(true) ? 0 : -1; } } diff --git a/src/org/commoncrawl/examples/mapreduce/WATSampleOutLinks.java b/src/org/commoncrawl/examples/mapreduce/WATSampleOutLinks.java new file mode 100644 index 0000000..c7eeb95 --- /dev/null +++ b/src/org/commoncrawl/examples/mapreduce/WATSampleOutLinks.java @@ -0,0 +1,490 @@ +package org.commoncrawl.examples.mapreduce; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.regex.Pattern; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.log4j.Logger; +import org.archive.io.ArchiveReader; +import org.archive.io.ArchiveRecord; +import org.commoncrawl.warc.WARCFileInputFormat; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; + +/** + * Extract and randomly sample outlinks (links to pages, not image and media links) from WAT + * files. + */ +public class WATSampleOutLinks extends Configured implements Tool { + + private static final Logger LOG = Logger.getLogger(WATSampleOutLinks.class); + + protected static enum COUNTER { + RECORDS, // + RESPONSE_RECORDS, // + RECORDS_NON_HTML, // + RECORDS_NOFOLLOW_X_ROBOTS_SKIPPED, // + RECORDS_NOFOLLOW_META_SKIPPED, // + EXCEPTIONS, // + EXCEPTIONS_JSON, // + EXCEPTIONS_URL_MALFORMED, // + LINKS_PAGE_ACCEPTED, // + LINKS_TOTAL, // + LINKS_MEDIA_SKIPPED, // + LINKS_REL_NOFOLLOW_SKIPPED, // + LINKS_UNSAFE_SKIPPED, // + LINKS_PAGE_UNIQ, // + LINKS_PAGE_UNIQ_ACCEPTED, // + LINKS_PAGE_UNIQ_SKIPPED_MAX_PER_PAGE, // + LINKS_RANDOM_SKIP, // + LINKS_RANDOM_SAMPLED, // + LINKS_MALFORMED_URL, // + LINKS_UNSAFE_TEXT_SKIPPED /** URL contains tab or newline character */ + } + + private static final Pattern dataUriPattern = Pattern.compile("@/data-(?:href|uri)$"); + private static final Pattern globalLinkPattern = Pattern.compile("^(?:[a-z][a-z0-9]{1,5}:)?//"); + private static Pattern nofollowPattern = Pattern.compile("\\bnofollow\\b", Pattern.CASE_INSENSITIVE); + + protected static class OutLinkMapper extends Mapper { + private Text outKey = new Text(); + private LongWritable outVal = new LongWritable(1); + private LongWritable one = new LongWritable(1); + int maxOutlinksPerPage = 80; + boolean outlinksWeightedCount = false; + boolean respectNofollow = false; + boolean extractFeed = false; + String extractFeedMarker = ""; + Pattern nofollowBotPattern = null; + + @Override + public void setup(Context context) { + Configuration conf = context.getConfiguration(); + maxOutlinksPerPage = conf.getInt("wat.outlinks.max.per.page", 80); + /** + * weighted link counts: each page can distributed `wat.outlinks.max.per.page` points, links + * from pages with many links get a lower weight, the weight is calculated as + * `wat.outlinks.max.per.page / num_links_of_page` + */ + outlinksWeightedCount = conf.getBoolean("wat.outlinks.weighted.count", false); + extractFeed = conf.getBoolean("wat.outlinks.extract.feed", false); + extractFeedMarker = conf.get("wat.outlinks.extract.feed.marker", ""); + respectNofollow = conf.getBoolean("wat.outlinks.respect.nofollow", false); + if (respectNofollow) { + String nofollowBotPatternString = conf.get("wat.outlinks.respect.nofollow.bot.pattern", ""); + if (!nofollowBotPatternString.isBlank()) { + try { + nofollowBotPattern = Pattern + .compile("\\s*" + nofollowBotPatternString + "\\s*", Pattern.CASE_INSENSITIVE); + } catch (IllegalArgumentException e) { + LOG.error("Failed to compile wat.outlinks.respect.nofollow.bot.pattern", e); + } + } + } + } + + @Override + public void map(Text key, ArchiveReader value, Context context) throws IOException { + record: for (ArchiveRecord r : value) { + // Skip any records that are not JSON + if (!r.getHeader().getMimetype().equals("application/json")) { + continue record; + } + try { + context.getCounter(COUNTER.RECORDS).increment(1); + // Convenience function that reads the full message into a raw byte array + byte[] rawData = IOUtils.toByteArray(r, r.available()); + String content = new String(rawData); + try { + JSONObject json = new JSONObject(content); + JSONObject warcHeader = json.getJSONObject("Envelope").getJSONObject("WARC-Header-Metadata"); + String warcType = warcHeader.getString("WARC-Type"); + if (!warcType.equals("response")) { + continue record; + } + context.getCounter(COUNTER.RESPONSE_RECORDS).increment(1); + String base = warcHeader.getString("WARC-Target-URI"); + if (base.charAt(0) == '<') { + // some WARC file enclose the WARC-Target-URI in <...> + base = base.substring(1, (base.length() - 2)); + } + URL baseUrl = new URL(base); + JSONObject responseMetaData = json.getJSONObject("Envelope") + .getJSONObject("Payload-Metadata") + .getJSONObject("HTTP-Response-Metadata"); + if (respectNofollow) { + // check HTTP header "X-Robots-Tag", eg. + // X-Robots-Tag: noindex, nofollow + // Note: only the first Header value is preserved in WAT files + JSONObject httpHeaders = responseMetaData.getJSONObject("Headers"); + JSONArray httpHeaderNames = httpHeaders.names(); + for (int i = 0, l = httpHeaders.length(); i < l; i++) { + String headerName = httpHeaderNames.getString(i); + if (headerName.equalsIgnoreCase("x-robots-tag")) { + Object headerValue = httpHeaders.get(headerName); + if (headerValue instanceof String) { + if (nofollowPattern.matcher((String) headerValue).find()) { + context.getCounter(COUNTER.RECORDS_NOFOLLOW_X_ROBOTS_SKIPPED).increment(1); + continue record; + } + } else if (headerValue instanceof JSONArray) { + for (int j = 0, L = ((JSONArray) headerValue).length(); j < L; j++) { + if (nofollowPattern.matcher(((JSONArray) headerValue).getString(j)) + .find()) { + context.getCounter(COUNTER.RECORDS_NOFOLLOW_X_ROBOTS_SKIPPED) + .increment(1); + continue record; + } + } + } else { + LOG.error( + "Unexpected JSON value type when processing X-Robots-Tag: " + + headerValue.getClass().getName()); + } + /* + * Note: continue to iterate over all HTTP headers because there might be variants + * (lower/upper case) of the "X-Robots-Tag" header + */ + } + } + } + if (!responseMetaData.has("HTML-Metadata")) { + context.getCounter(COUNTER.RECORDS_NON_HTML).increment(1); + continue record; + } + JSONObject htmlMetaData = responseMetaData.getJSONObject("HTML-Metadata"); + Set outLinks = new HashSet<>(); + if (htmlMetaData.has("Head")) { + JSONObject head = htmlMetaData.getJSONObject("Head"); + if (head.has("Base")) { + base = head.getString("Base"); + try { + URL b = new URL(baseUrl, base); + baseUrl = b; + } catch (MalformedURLException ex) { + LOG.error("Ignoring malformed base URL '" + base + "': " + ex.getMessage()); + } + } + if (head.has("Metas")) { + JSONArray metas = head.getJSONArray("Metas"); + for (int i = 0, l = metas.length(); i < l; i++) { + JSONObject meta = metas.getJSONObject(i); + if (meta.has("property") && meta.getString("property").equals("og:url") + && meta.has("content")) { + try { + URL url = new URL(baseUrl, meta.getString("content")); + context.getCounter(COUNTER.LINKS_TOTAL).increment(1); + outLinks.add(url.toString()); + } catch (MalformedURLException ex) { + context.getCounter(COUNTER.LINKS_MALFORMED_URL).increment(1); + } + } + if (respectNofollow && meta.has("name") && (meta.getString("name") + .equalsIgnoreCase("robots") + || (nofollowBotPattern != null + && nofollowBotPattern.matcher(meta.getString("name")).matches()))) { + // check HTML meta "robots" + if (meta.has("content") + && nofollowPattern.matcher(meta.getString("content")).find()) { + context.getCounter(COUNTER.RECORDS_NOFOLLOW_META_SKIPPED).increment(1); + continue record; + } + } + } + } + if (head.has("Link")) { + // + addOutLinks(context, outLinks, baseUrl, head.getJSONArray("Link")); + } + } + if (htmlMetaData.has("Links")) { + JSONArray links = htmlMetaData.getJSONArray("Links"); + addOutLinks(context, outLinks, baseUrl, links); + } + context.getCounter(COUNTER.LINKS_PAGE_UNIQ).increment(outLinks.size()); + if (outlinksWeightedCount) { + if (outLinks.size() >= maxOutlinksPerPage) { + outVal = one; + } else { + outVal = new LongWritable(Math.round(1.0d * maxOutlinksPerPage / outLinks.size())); + } + } + int n = 0; + for (String url : outLinks) { + n++; + outKey.set(url); + context.write(outKey, outVal); + if (n > maxOutlinksPerPage) { + context.getCounter(COUNTER.LINKS_PAGE_UNIQ_SKIPPED_MAX_PER_PAGE) + .increment(outLinks.size() - n); + break; + } + } + context.getCounter(COUNTER.LINKS_PAGE_UNIQ_ACCEPTED).increment(n); + } catch (JSONException ex) { + context.getCounter(COUNTER.EXCEPTIONS_JSON).increment(1); + LOG.error("Caught JSONException while processing record for " + r.getHeader().getUrl(), ex); + } catch (MalformedURLException ex) { + LOG.error( + "Caught MalformedURLException while processing record for " + r.getHeader().getUrl(), + ex); + context.getCounter(COUNTER.EXCEPTIONS_URL_MALFORMED).increment(1); + } catch (Exception ex) { + context.getCounter(COUNTER.EXCEPTIONS).increment(1); + LOG.error("Caught Exception while processing record for " + r.getHeader().getUrl(), ex); + } + } catch (Exception ex) { + LOG.error("Caught Exception while processing record for " + r.getHeader().getUrl(), ex); + context.getCounter(COUNTER.EXCEPTIONS).increment(1); + } + } + + } + + private void addOutLinks(Context context, Collection outLinks, URL baseUrl, JSONArray links) + throws JSONException { + context.getCounter(COUNTER.LINKS_TOTAL).increment(links.length()); + links: for (int i = 0, l = links.length(); i < l; i++) { + JSONObject link = links.getJSONObject(i); + if (link.has("url") && link.has("path")) { + String linkTypeMarker = ""; + String path = link.getString("path"); + String urlStr = link.getString("url"); + path: switch (path) { + case "A@/href": + if (respectNofollow && link.has("rel") + && nofollowPattern.matcher(link.getString("rel")).find()) { + context.getCounter(COUNTER.LINKS_REL_NOFOLLOW_SKIPPED).increment(1); + continue links; + } + break path; + case "IMG@/src": + case "FORM@/action": + case "TD@/background": + case "TABLE@/background": + case "BODY@/background": + case "AUDIO@/src": + case "VIDEO@/src": + case "TR@/background": + // ignore images and media + context.getCounter(COUNTER.LINKS_MEDIA_SKIPPED).increment(1); + continue links; + case "LINK@/href": + if (link.has("rel")) { + switch (link.getString("rel")) { + case "canonical": + break path; + case "alternate": + if (link.has("hreflang")) { + // sample translations + break path; + } + if (extractFeed && link.has("type")) { + String type = link.getString("type"); + if ("application/atom+xml".equals(type) || "application/rss+xml".equals(type)) { + linkTypeMarker = extractFeedMarker; + break path; + } + } + // fall-through for other rel links + default: + // ignore rels not explicitly listed + context.getCounter(COUNTER.LINKS_MEDIA_SKIPPED).increment(1); + continue links; + } + } + break path; + default: + if (dataUriPattern.matcher(path).find()) { + if (globalLinkPattern.matcher(urlStr).find()) { + // ok, it's a global link, should work + } else { + // relative links in data-* attributes are not safe because + // Javascript is required to make them absolute/global + context.getCounter(COUNTER.LINKS_UNSAFE_SKIPPED).increment(1); + continue links; + } + } + } + context.getCounter(COUNTER.LINKS_PAGE_ACCEPTED).increment(1); + try { + URL url = new URL(baseUrl, urlStr); + outLinks.add(linkTypeMarker + url.toString()); + } catch (MalformedURLException ex) { + context.getCounter(COUNTER.LINKS_MALFORMED_URL).increment(1); + } + } + } + } + } + + protected static class OutLinkCombiner extends Reducer { + private LongWritable outVal = new LongWritable(1); + + /** + * @return true if text is safe and does not contain any control characters (U+0000 - U+001F) + * including '\t', '\r', '\n' + */ + public static boolean isSafeText(Text text) { + int pos = 0; + for (byte b : text.getBytes()) { + if (++pos > text.getLength()) { + // cf. Text#getBytes() + break; + } + if ((b & ~((byte) 0x1F)) == 0) { + // none of the leading 3 bits is set: 0x00 <= b <= 0x1F + return false; + } + } + return true; + } + + @Override + public void reduce(Text key, Iterable values, Context context) + throws IOException, InterruptedException { + if (!isSafeText(key)) { + context.getCounter(COUNTER.LINKS_UNSAFE_TEXT_SKIPPED).increment(1); + return; + } + long sum = 0; + for (LongWritable val : values) { + sum += val.get(); + } + outVal.set(sum); + context.write(key, outVal); + } + + } + + protected static class OutLinkReducer extends OutLinkCombiner { + + private double sampleProbability = .5; + private LongWritable outVal = new LongWritable(1); + + @Override + public void setup(Context context) { + sampleProbability = context.getConfiguration().getDouble("wat.outlinks.sample.probability", .5); + LOG.info("Outlink sample probability = " + sampleProbability); + // invert sample probability for comparison with random number (0.0 <= random < 1.0) + // choose link if random number is greater than or equal inverted probability + sampleProbability = (1.0 - sampleProbability); + } + + @Override + public void reduce(Text key, Iterable values, Context context) + throws IOException, InterruptedException { + if (!isSafeText(key)) { + context.getCounter(COUNTER.LINKS_UNSAFE_TEXT_SKIPPED).increment(1); + return; + } + long sum = 0; + for (LongWritable val : values) { + sum += val.get(); + } + if (sampleProbability <= 0.0 || (sum * Math.random()) >= sampleProbability) { + // multiply random by number of times outlink URL has been observed + outVal.set(sum); + context.write(key, outVal); + context.getCounter(COUNTER.LINKS_RANDOM_SAMPLED).increment(1); + } else { + context.getCounter(COUNTER.LINKS_RANDOM_SKIP).increment(1); + } + } + + } + + @Override + public int run(String[] args) throws Exception { + if (args.length < 2) { + System.err.println("Usage: WATSampleOutLinks [-Dproperty=value ...] ..."); + System.err.println(" -Dwat.outlinks.sample.probability="); + System.err.println(" \t\tprobability (0.0 < prob <= 1.0) to select an outlink"); + System.err.println(" -Dwat.outlinks.max.per.page=n"); + System.err.println(" \t\tmax. number of accepted outlinks per page"); + System.err.println(" -Dwat.outlinks.respect.nofollow="); + System.err.println(" \t\twhether to respect the nofollow link attributes and robots metadata"); + System.err.println(" -Dwat.outlinks.respect.nofollow.bot.pattern=mybot"); + System.err.println(" \t\tuser-specific bot name(s) when respecting nofollow robots HTML metadata,"); + System.err.println(" \t\tdefined as regular expression pattern. The nofollow metadata instructions"); + System.err.println(" \t\tfor the matched bot(s) are respected in addition to those addressing any bot."); + return -1; + } + Path outputPath = null; + List inputPaths = new ArrayList<>(); + for (int i = 0; i < args.length; i++) { + if (outputPath == null) { + outputPath = new Path(args[i]); + } else { + inputPaths.add(new Path(args[i])); + } + } + return run(outputPath, inputPaths.toArray(new Path[inputPaths.size()])); + } + + public int run(Path outputPath, Path[] inputPaths) + throws IOException, ClassNotFoundException, InterruptedException { + Configuration conf = getConf(); + + Job job = Job.getInstance(conf); + job.setJarByClass(WATSampleOutLinks.class); + + double sampleProbability = conf.getDouble("wat.outlinks.sample.probability", .5); + + for (int i = 0; i < inputPaths.length; i++) { + LOG.info("Input path: " + inputPaths[i]); + FileInputFormat.addInputPath(job, inputPaths[i]); + } + + FileOutputFormat.setOutputPath(job, outputPath); + LOG.info("Output path: " + outputPath); + + job.setInputFormatClass(WARCFileInputFormat.class); + job.setOutputFormatClass(TextOutputFormat.class); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(LongWritable.class); + + job.setMapperClass(OutLinkMapper.class); + job.setCombinerClass(OutLinkCombiner.class); + if (sampleProbability >= 1.0) { + LOG.info("Sample probablity >= 1.0: no random sampling, output all outlinks"); + job.setReducerClass(OutLinkCombiner.class); + } else { + LOG.info("Sampling outlinks with probability " + sampleProbability); + job.setReducerClass(OutLinkReducer.class); + } + + if (job.waitForCompletion(true)) { + return 0; + } + return 1; + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(new Configuration(), new WATSampleOutLinks(), args); + System.exit(res); + } + +} diff --git a/src/org/commoncrawl/examples/mapreduce/WATServerType.java b/src/org/commoncrawl/examples/mapreduce/WATServerType.java index 8e6ea29..3e635fa 100644 --- a/src/org/commoncrawl/examples/mapreduce/WATServerType.java +++ b/src/org/commoncrawl/examples/mapreduce/WATServerType.java @@ -1,8 +1,11 @@ package org.commoncrawl.examples.mapreduce; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; @@ -23,9 +26,9 @@ */ public class WATServerType extends Configured implements Tool { private static final Logger LOG = Logger.getLogger(WATServerType.class); - + /** - * Main entry point that uses the {@link ToolRunner} class to run the Hadoop job. + * Main entry point that uses the {@link ToolRunner} class to run the Hadoop job. */ public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new WATServerType(), args); @@ -34,42 +37,57 @@ public static void main(String[] args) throws Exception { /** * Builds and runs the Hadoop job. - * @return 0 if the Hadoop job completes successfully and 1 otherwise. + * + * @param args command line arguments + * @return 0 if the Hadoop job completes successfully and 1 otherwise. */ @Override - public int run(String[] arg0) throws Exception { + public int run(String[] args) throws Exception { + if (args.length < 2) { + System.err.println("Usage: " + this.getClass().getSimpleName() + " ..."); + return -1; + } + Path outputPath = null; + List inputPaths = new ArrayList<>(); + for (int i = 0; i < args.length; i++) { + if (outputPath == null) { + outputPath = new Path(args[i]); + } else { + inputPaths.add(new Path(args[i])); + } + } + return run(outputPath, inputPaths.toArray(new Path[inputPaths.size()])); + } + + public int run(Path outputPath, Path[] inputPaths) + throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = getConf(); - // - Job job = new Job(conf); + + Job job = Job.getInstance(conf); job.setJarByClass(WATServerType.class); job.setNumReduceTasks(1); - - String inputPath = "data/*.warc.wat.gz"; - //inputPath = "s3n://aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2013-48/segments/1386163035819/wet/CC-MAIN-20131204131715-00000-ip-10-33-133-15.ec2.internal.warc.wet.gz"; - //inputPath = "s3n://aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2013-48/segments/1386163035819/wet/*.warc.wet.gz"; - LOG.info("Input path: " + inputPath); - FileInputFormat.addInputPath(job, new Path(inputPath)); - - String outputPath = "/tmp/cc/"; - FileSystem fs = FileSystem.newInstance(conf); - if (fs.exists(new Path(outputPath))) { - fs.delete(new Path(outputPath), true); + + for (int i = 0; i < inputPaths.length; i++) { + LOG.info("Input path: " + inputPaths[i]); + FileInputFormat.addInputPath(job, inputPaths[i]); } - FileOutputFormat.setOutputPath(job, new Path(outputPath)); - + + LOG.info("Output path: " + outputPath); + FileOutputFormat.setOutputPath(job, outputPath); + job.setInputFormatClass(WARCFileInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); - + job.setOutputKeyClass(Text.class); - job.setOutputValueClass(LongWritable.class); - - job.setMapperClass(ServerTypeMap.ServerMapper.class); - job.setReducerClass(LongSumReducer.class); - - if (job.waitForCompletion(true)) { - return 0; - } else { - return 1; - } + job.setOutputValueClass(LongWritable.class); + + job.setMapperClass(ServerTypeMap.ServerMapper.class); + job.setReducerClass(LongSumReducer.class); + + if (job.waitForCompletion(true)) { + return 0; + } else { + return 1; + } } } diff --git a/src/org/commoncrawl/examples/mapreduce/WETWordCount.java b/src/org/commoncrawl/examples/mapreduce/WETWordCount.java index 3094fa8..b9e6bad 100644 --- a/src/org/commoncrawl/examples/mapreduce/WETWordCount.java +++ b/src/org/commoncrawl/examples/mapreduce/WETWordCount.java @@ -1,8 +1,11 @@ package org.commoncrawl.examples.mapreduce; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; @@ -23,9 +26,9 @@ */ public class WETWordCount extends Configured implements Tool { private static final Logger LOG = Logger.getLogger(WETWordCount.class); - + /** - * Main entry point that uses the {@link ToolRunner} class to run the Hadoop job. + * Main entry point that uses the {@link ToolRunner} class to run the Hadoop job. */ public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new WETWordCount(), args); @@ -34,43 +37,58 @@ public static void main(String[] args) throws Exception { /** * Builds and runs the Hadoop job. - * @return 0 if the Hadoop job completes successfully and 1 otherwise. + * + * @param args command line arguments + * @return 0 if the Hadoop job completes successfully and 1 otherwise. */ @Override - public int run(String[] arg0) throws Exception { + public int run(String[] args) throws Exception { + if (args.length < 2) { + System.err.println("Usage: " + this.getClass().getSimpleName() + " ..."); + return -1; + } + Path outputPath = null; + List inputPaths = new ArrayList<>(); + for (int i = 0; i < args.length; i++) { + if (outputPath == null) { + outputPath = new Path(args[i]); + } else { + inputPaths.add(new Path(args[i])); + } + } + return run(outputPath, inputPaths.toArray(new Path[inputPaths.size()])); + } + + public int run(Path outputPath, Path[] inputPaths) + throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = getConf(); - // - Job job = new Job(conf); + + Job job = Job.getInstance(conf); job.setJarByClass(WETWordCount.class); job.setNumReduceTasks(1); - - String inputPath = "data/*.warc.wet.gz"; - //inputPath = "s3n://aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2013-48/segments/1386163035819/wet/CC-MAIN-20131204131715-00000-ip-10-33-133-15.ec2.internal.warc.wet.gz"; - //inputPath = "s3n://aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2013-48/segments/1386163035819/wet/*.warc.wet.gz"; - LOG.info("Input path: " + inputPath); - FileInputFormat.addInputPath(job, new Path(inputPath)); - - String outputPath = "/tmp/cc/"; - FileSystem fs = FileSystem.newInstance(conf); - if (fs.exists(new Path(outputPath))) { - fs.delete(new Path(outputPath), true); + + for (int i = 0; i < inputPaths.length; i++) { + LOG.info("Input path: " + inputPaths[i]); + FileInputFormat.addInputPath(job, inputPaths[i]); } - FileOutputFormat.setOutputPath(job, new Path(outputPath)); - + + LOG.info("Output path: " + outputPath); + FileOutputFormat.setOutputPath(job, outputPath); + job.setInputFormatClass(WARCFileInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); - + job.setOutputKeyClass(Text.class); - job.setOutputValueClass(LongWritable.class); - - job.setMapperClass(WordCounterMap.WordCountMapper.class); - // The reducer is quite useful in the word frequency task - job.setReducerClass(LongSumReducer.class); - - if (job.waitForCompletion(true)) { - return 0; - } else { - return 1; - } + job.setOutputValueClass(LongWritable.class); + + job.setMapperClass(WordCounterMap.WordCountMapper.class); + // The reducer is quite useful in the word frequency task + job.setReducerClass(LongSumReducer.class); + + if (job.waitForCompletion(true)) { + return 0; + } else { + return 1; + } } } diff --git a/src/org/commoncrawl/examples/mapreduce/WordCounterMap.java b/src/org/commoncrawl/examples/mapreduce/WordCounterMap.java index 3f0211a..d8d549e 100644 --- a/src/org/commoncrawl/examples/mapreduce/WordCounterMap.java +++ b/src/org/commoncrawl/examples/mapreduce/WordCounterMap.java @@ -13,11 +13,12 @@ public class WordCounterMap { private static final Logger LOG = Logger.getLogger(WordCounterMap.class); + protected static enum MAPPERCOUNTER { - RECORDS_IN, - EMPTY_PAGE_TEXT, - EXCEPTIONS, - NON_PLAIN_TEXT + RECORDS_IN, // + EMPTY_PAGE_TEXT, // + EXCEPTIONS, // + NON_PLAIN_TEXT // } protected static class WordCountMapper extends Mapper { @@ -48,8 +49,7 @@ public void map(Text key, ArchiveReader value, Context context) throws IOExcepti } else { context.getCounter(MAPPERCOUNTER.NON_PLAIN_TEXT).increment(1); } - } - catch (Exception ex) { + } catch (Exception ex) { LOG.error("Caught Exception", ex); context.getCounter(MAPPERCOUNTER.EXCEPTIONS).increment(1); } diff --git a/src/org/commoncrawl/warc/WARCFileInputFormat.java b/src/org/commoncrawl/warc/WARCFileInputFormat.java index 89d2433..d752774 100644 --- a/src/org/commoncrawl/warc/WARCFileInputFormat.java +++ b/src/org/commoncrawl/warc/WARCFileInputFormat.java @@ -12,8 +12,8 @@ import org.archive.io.ArchiveReader; /** - * Minimal implementation of FileInputFormat for WARC files. - * Hadoop is told that splitting these compressed files is not possible. + * Minimal implementation of FileInputFormat for WARC files. Hadoop is told that splitting these + * compressed files is not possible. * * @author Stephen Merity (Smerity) */ @@ -24,7 +24,7 @@ public RecordReader createRecordReader(InputSplit split, Ta throws IOException, InterruptedException { return new WARCFileRecordReader(); } - + @Override protected boolean isSplitable(JobContext context, Path filename) { // As these are compressed files, they cannot be (sanely) split diff --git a/src/org/commoncrawl/warc/WARCFileRecordReader.java b/src/org/commoncrawl/warc/WARCFileRecordReader.java index b1e8e1e..a31d6ad 100644 --- a/src/org/commoncrawl/warc/WARCFileRecordReader.java +++ b/src/org/commoncrawl/warc/WARCFileRecordReader.java @@ -15,9 +15,9 @@ import org.archive.io.warc.WARCReaderFactory; /** - * The WARC File Record Reader processes a single compressed input. - * The Record Reader returns a single WARC ArchiveReader that can contain - * numerous individual documents, each document handled in a single mapper. + * The WARC File Record Reader processes a single compressed input. The Record Reader returns a + * single WARC ArchiveReader that can contain numerous individual documents, each document + * handled in a single mapper. * * @author Stephen Merity (Smerity) */ @@ -28,8 +28,7 @@ public class WARCFileRecordReader extends RecordReader { private boolean hasBeenRead = false; @Override - public void initialize(InputSplit inputSplit, TaskAttemptContext context) - throws IOException, InterruptedException { + public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException { FileSplit split = (FileSplit) inputSplit; Configuration conf = context.getConfiguration(); Path path = split.getPath();