diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index dc52ef3..5c79a4c 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -63,5 +63,7 @@ jobs: with: distribution: adopt java-version: ${{ matrix.java }} + - name: Check code formatting + run: mvn -B --no-transfer-progress com.cosium.code:git-code-format-maven-plugin:validate-code-format -Dskip.format.code=false - name: Build with Maven run: mvn -B --no-transfer-progress package --file pom.xml -DCI_ENV=true verify diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..37effdb --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +.idea +target +opensearchdata +warcdata +.java-version \ No newline at end of file diff --git a/.mvn/jvm.config b/.mvn/jvm.config new file mode 100644 index 0000000..87ae20c --- /dev/null +++ b/.mvn/jvm.config @@ -0,0 +1,8 @@ +--add-exports jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED +--add-exports jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED +--add-exports jdk.compiler/com.sun.tools.javac.main=ALL-UNNAMED +--add-exports jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED +--add-exports jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED +--add-exports jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED +--add-opens jdk.compiler/com.sun.tools.javac.code=ALL-UNNAMED +--add-opens jdk.compiler/com.sun.tools.javac.comp=ALL-UNNAMED diff --git a/README.md b/README.md index 4efbf08..874da21 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ Crawler for news based on [StormCrawler](https://stormcrawler.apache.org/). Prod ## Prerequisites - +* JVM 17 or higher * Install OpenSearch 2.19.5 * Install Apache Storm 2.8.8 * Start OpenSearch and Storm @@ -119,3 +119,18 @@ NewsCrawl ACTIVE 48 1 146 NewsCrawl-1 $> storm kill NewsCrawl ``` +## Note for developers + +Please format your code before submitting a PR with + +``` +mvn git-code-format:format-code -Dgcf.globPattern="**/*" -Dskip.format.code=false +``` + +You can enable pre-commit format hooks by running: + +``` +mvn clean install -Dskip.format.code=false +``` + + diff --git a/pom.xml b/pom.xml index 0508210..69b7487 100644 --- a/pom.xml +++ b/pom.xml @@ -1,5 +1,4 @@ - - - + 4.0.0 org.commoncrawl.stormcrawler.news crawler 3.6.0 jar + + https://github.com/commoncrawl/news-crawl Apache License, Version 2.0 @@ -35,8 +36,6 @@ under the License. - https://github.com/commoncrawl/news-crawl - UTF-8 3.6.0 @@ -47,89 +46,10 @@ under the License. 5.23.0 3.0.1 4.13.2 + 5.4 + true - - - - org.apache.maven.plugins - maven-compiler-plugin - 3.15.0 - - 17 - 17 - - - - org.codehaus.mojo - exec-maven-plugin - 3.6.3 - - - - exec - - - - - java - true - false - compile - - - - org.apache.maven.plugins - maven-shade-plugin - 3.6.2 - - - package - - shade - - - false - - - - org.apache.storm.flux.Flux - - - - - - - - - - *:* - - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - - - - - org.apache.storm:flux-core - - org/apache/commons/** - org/apache/http/** - org/yaml/** - - - - - - - - - - org.apache.stormcrawler @@ -207,4 +127,125 @@ under the License. test + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.15.0 + + 17 + 17 + + + + org.codehaus.mojo + exec-maven-plugin + 3.6.3 + + java + true + false + compile + + + + + exec + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.6.2 + + + + shade + + package + + false + + + + org.apache.storm.flux.Flux + + + + + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + org.apache.storm:flux-core + + org/apache/commons/** + org/apache/http/** + org/yaml/** + + + + + + + + + com.cosium.code + git-code-format-maven-plugin + ${git-code-format-maven-plugin.version} + + + + install-formatter-hook + + install-hooks + + + + + validate-code-format + + validate-code-format + + + + + + + com.cosium.code + google-java-format + ${git-code-format-maven-plugin.version} + + + + ${skip.format.code} + + true + false + false + false + + + + + diff --git a/src/main/java/org/commoncrawl/stormcrawler/filter/FastURLFilter.java b/src/main/java/org/commoncrawl/stormcrawler/filter/FastURLFilter.java index 7c52716..9b2ee32 100644 --- a/src/main/java/org/commoncrawl/stormcrawler/filter/FastURLFilter.java +++ b/src/main/java/org/commoncrawl/stormcrawler/filter/FastURLFilter.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -16,6 +16,16 @@ */ package org.commoncrawl.stormcrawler.filter; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.S3Object; +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.LinkedHashMultimap; +import com.google.common.collect.Multimap; import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.IOException; @@ -29,7 +39,6 @@ import java.util.TimerTask; import java.util.regex.Pattern; import java.util.zip.GZIPInputStream; - import org.apache.commons.lang3.StringUtils; import org.apache.stormcrawler.JSONResource; import org.apache.stormcrawler.Metadata; @@ -38,28 +47,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; -import com.amazonaws.services.s3.model.GetObjectRequest; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.S3Object; -import com.fasterxml.jackson.core.JsonParseException; -import com.fasterxml.jackson.databind.JsonMappingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.collect.LinkedHashMultimap; -import com.google.common.collect.Multimap; - /** - * Version of the FastURLFilter that can load from a text representation instead - * of the JSON that the SC version handles. Can also reload periodically and get - * its content from S3. - * - * Filters URLs based on a file of regular expressions using host/domains - * matching first. The default policy is to accept a URL if no matches are - * found. + * Version of the FastURLFilter that can load from a text representation instead of the JSON that + * the SC version handles. Can also reload periodically and get its content from S3. + * + *

Filters URLs based on a file of regular expressions using host/domains matching first. The + * default policy is to accept a URL if no matches are found. + * + *

Rule Format: * - * Rule Format: - * *

  * Host www.example.org
  *   DenyPath /path/to/be/excluded
@@ -72,48 +68,45 @@
  * Domain example.org
  *   DenyPathQuery /resource/.*?action=exclude
  * 
- * - * Host rules are evaluated before Domain rules. For - * Host rules the entire host name of a URL must match while the - * domain names in Domain rules are considered as matches if the - * domain is a suffix of the host name (consisting of complete host name parts). - * Shorter domain suffixes are checked first, a single dot - * "." as "domain name" can be used to specify - * global rules applied to every URL. - * - * E.g., for "www.example.com" the rules given above are looked up in the - * following order: + * + * Host rules are evaluated before Domain rules. For Host + * rules the entire host name of a URL must match while the domain names in Domain + * rules are considered as matches if the domain is a suffix of the host name (consisting of + * complete host name parts). Shorter domain suffixes are checked first, a single dot ". + * " as "domain name" can be used to specify global rules applied to every + * URL. + * + *

E.g., for "www.example.com" the rules given above are looked up in the following order: + * *

    - *
  1. check "www.example.com" whether host-based rules exist and whether one of - * them matches
  2. - *
  3. check "www.example.com" for domain-based rules
  4. - *
  5. check "example.com" for domain-based rules
  6. - *
  7. check "com" for domain-based rules
  8. - *
  9. check for global rules ("Domain .")
  10. + *
  11. check "www.example.com" whether host-based rules exist and whether one of them matches + *
  12. check "www.example.com" for domain-based rules + *
  13. check "example.com" for domain-based rules + *
  14. check "com" for domain-based rules + *
  15. check for global rules ("Domain .") *
- * The first matching rule will reject the URL and no further rules are checked. - * If no rule matches the URL is accepted. URLs without a host name (e.g., - * file:/path/file.txt are checked for global rules only. URLs - * which fail to be parsed as {@link java.net.URL} are always rejected. - * - * For rules either the URL path (DenyPath) or path and query - * (DenyPathQuery) are checked whether the given - * {@link java.util.regex Java Regular expression} is found (see - * {@link java.util.regex.Matcher#find()}) in the URL path (and query). - * - * Rules are applied in the order of their definition. For better performance, - * regular expressions which are simpler/faster or match more URLs should be - * defined earlier. - * - * Comments in the rule file start with the # character and reach - * until the end of the line. - * - * The rules file is defined via the property urlfilter.fast.file, - * the default name is fast-urlfilter.txt. + * + * The first matching rule will reject the URL and no further rules are checked. If no rule matches + * the URL is accepted. URLs without a host name (e.g., file:/path/file.txt are checked + * for global rules only. URLs which fail to be parsed as {@link java.net.URL} are always rejected. + * + *

For rules either the URL path (DenyPath) or path and query (DenyPathQuery + * ) are checked whether the given {@link java.util.regex Java Regular expression} is found + * (see {@link java.util.regex.Matcher#find()}) in the URL path (and query). + * + *

Rules are applied in the order of their definition. For better performance, regular + * expressions which are simpler/faster or match more URLs should be defined earlier. + * + *

Comments in the rule file start with the # character and reach until the end of + * the line. + * + *

The rules file is defined via the property urlfilter.fast.file, the default name + * is fast-urlfilter.txt. */ -public class FastURLFilter extends URLFilter implements JSONResource { +public class FastURLFilter extends URLFilter implements JSONResource { - protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + protected static final Logger LOG = + LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); public static final String URLFILTER_FAST_FILE = "urlfilter.fast.file"; private Multimap hostRules = LinkedHashMultimap.create(); @@ -121,302 +114,320 @@ public class FastURLFilter extends URLFilter implements JSONResource { private String resourceFile; - private static final Pattern CATCH_ALL_RULE = Pattern.compile("^\\s*DenyPath(?:Query)?\\s+\\.[*?]\\s*$"); + private static final Pattern CATCH_ALL_RULE = + Pattern.compile("^\\s*DenyPath(?:Query)?\\s+\\.[*?]\\s*$"); private String resourceETAG; public void configure(@SuppressWarnings("rawtypes") Map stormConf, JsonNode filterParams) { - // read from conf first - int refreshRate = ConfUtils.getInt(stormConf, "fast.urlfilter.refresh", -1); - this.resourceFile = ConfUtils.getString(stormConf, "fast.urlfilter.file", null); - - // then from the param file (which needs recompiling in case of change) - if (filterParams != null) { - JsonNode node = filterParams.get("file"); - if (node != null && node.isTextual() && this.resourceFile == null) { - this.resourceFile = node.asText(); - } - node = filterParams.get("refresh"); - if (node != null && node.isInt() && refreshRate == -1) { - refreshRate = node.asInt(); - } - } - - try { - loadJSONResources(); - } catch (Exception e) { - LOG.error("Exception while loading resources", e); - } - - if (refreshRate != -1) { - LOG.info("Filter set to reload from {} every {} sec", getResourceFile(), refreshRate); - new Timer().schedule(new TimerTask() { - public void run() { - LOG.info("Reloading resources"); - try { - loadJSONResources(); - } catch (Exception e) { - LOG.error("Can't load resources", e); - } - } - }, refreshRate * 1000, refreshRate * 1000); - } + // read from conf first + int refreshRate = ConfUtils.getInt(stormConf, "fast.urlfilter.refresh", -1); + this.resourceFile = ConfUtils.getString(stormConf, "fast.urlfilter.file", null); + + // then from the param file (which needs recompiling in case of change) + if (filterParams != null) { + JsonNode node = filterParams.get("file"); + if (node != null && node.isTextual() && this.resourceFile == null) { + this.resourceFile = node.asText(); + } + node = filterParams.get("refresh"); + if (node != null && node.isInt() && refreshRate == -1) { + refreshRate = node.asInt(); + } + } + + try { + loadJSONResources(); + } catch (Exception e) { + LOG.error("Exception while loading resources", e); + } + + if (refreshRate != -1) { + LOG.info("Filter set to reload from {} every {} sec", getResourceFile(), refreshRate); + new Timer() + .schedule( + new TimerTask() { + public void run() { + LOG.info("Reloading resources"); + try { + loadJSONResources(); + } catch (Exception e) { + LOG.error("Can't load resources", e); + } + } + }, + refreshRate * 1000, + refreshRate * 1000); + } } /** * Load the resources from the JSON file in the uber jar or from S3 - * + * * @throws Exception - **/ + */ @Override public void loadJSONResources() throws Exception { - InputStream inputStream = null; - AmazonS3 s3client = null; - try { - if (getResourceFile().startsWith("s3://")) { - // try loading from S3 - s3client = AmazonS3ClientBuilder.standard().build(); - java.net.URI uri = new java.net.URI(getResourceFile()); - - String bucketName = uri.getHost(); - // remove the first "/" - String path = uri.getPath().substring(1); - - // optimisation - avoid a full reload if the resource has not changed - ObjectMetadata metadata = s3client.getObjectMetadata(bucketName, path); - final String ETAG = metadata.getETag(); - if (ETAG != null && ETAG.equals(resourceETAG)) { - LOG.info("Unchanged ETAG for {} - skipping reload", getResourceFile()); - return; - } else { - resourceETAG = ETAG; - } - - final S3Object object = s3client.getObject(new GetObjectRequest(bucketName, path)); - inputStream = object.getObjectContent(); - } else { - inputStream = getClass().getClassLoader().getResourceAsStream(getResourceFile()); - if (inputStream == null) { - LOG.error("Can't load conf from {}", getResourceFile()); - return; - } - } - if (getResourceFile().endsWith(".gz")) { - inputStream = new GZIPInputStream(inputStream); - } - - loadJSONResources(new BufferedInputStream(inputStream)); - } finally { - if (inputStream != null) { - inputStream.close(); - } - if (s3client != null) { - s3client.shutdown(); - } - } + InputStream inputStream = null; + AmazonS3 s3client = null; + try { + if (getResourceFile().startsWith("s3://")) { + // try loading from S3 + s3client = AmazonS3ClientBuilder.standard().build(); + java.net.URI uri = new java.net.URI(getResourceFile()); + + String bucketName = uri.getHost(); + // remove the first "/" + String path = uri.getPath().substring(1); + + // optimisation - avoid a full reload if the resource has not changed + ObjectMetadata metadata = s3client.getObjectMetadata(bucketName, path); + final String ETAG = metadata.getETag(); + if (ETAG != null && ETAG.equals(resourceETAG)) { + LOG.info("Unchanged ETAG for {} - skipping reload", getResourceFile()); + return; + } else { + resourceETAG = ETAG; + } + + final S3Object object = s3client.getObject(new GetObjectRequest(bucketName, path)); + inputStream = object.getObjectContent(); + } else { + inputStream = getClass().getClassLoader().getResourceAsStream(getResourceFile()); + if (inputStream == null) { + LOG.error("Can't load conf from {}", getResourceFile()); + return; + } + } + if (getResourceFile().endsWith(".gz")) { + inputStream = new GZIPInputStream(inputStream); + } + + loadJSONResources(new BufferedInputStream(inputStream)); + } finally { + if (inputStream != null) { + inputStream.close(); + } + if (s3client != null) { + s3client.shutdown(); + } + } } @Override public void loadJSONResources(InputStream inputStream) - throws JsonParseException, JsonMappingException, IOException { - long start = System.currentTimeMillis(); - - try (Reader r = new InputStreamReader(inputStream)) { - reloadRules(r); - } - - long end = System.currentTimeMillis(); - LOG.info("Loaded {} hostrules and {} domain rules in {} msec from {}", hostRules.size(), domainRules.size(), - (end - start), resourceFile); + throws JsonParseException, JsonMappingException, IOException { + long start = System.currentTimeMillis(); + + try (Reader r = new InputStreamReader(inputStream)) { + reloadRules(r); + } + + long end = System.currentTimeMillis(); + LOG.info( + "Loaded {} hostrules and {} domain rules in {} msec from {}", + hostRules.size(), + domainRules.size(), + (end - start), + resourceFile); } @Override public String getResourceFile() { - return resourceFile; + return resourceFile; } @Override public String filter(URL sourceUrl, Metadata sourceMetadata, String urlToFilter) { - synchronized (this) { - URL u; - - try { - u = new URL(urlToFilter); - } catch (Exception e) { - LOG.debug("Rejected {} because failed to parse as URL: {}", urlToFilter, e.getMessage()); - return null; - } - - String hostname = u.getHost(); - - // first check for host-specific rules - for (Rule rule : hostRules.get(hostname)) { - if (rule.match(u)) { - return null; - } - } - - // also look up domain rules for host name - for (Rule rule : domainRules.get(hostname)) { - if (rule.match(u)) { - return null; - } - } - - // check suffixes of host name from longer to shorter: - // subdomains, domain, top-level domain - int start = 0; - int pos; - while ((pos = hostname.indexOf('.', start)) != -1) { - start = pos + 1; - String domain = hostname.substring(start); - for (Rule rule : domainRules.get(domain)) { - if (rule.match(u)) { - return null; - } - } - } - - // finally check "global" rules defined for `Domain .` - for (Rule rule : domainRules.get(".")) { - if (rule.match(u)) { - return null; - } - } - - // no reject rules found - return urlToFilter; - } + synchronized (this) { + URL u; + + try { + u = new URL(urlToFilter); + } catch (Exception e) { + LOG.debug( + "Rejected {} because failed to parse as URL: {}", + urlToFilter, + e.getMessage()); + return null; + } + + String hostname = u.getHost(); + + // first check for host-specific rules + for (Rule rule : hostRules.get(hostname)) { + if (rule.match(u)) { + return null; + } + } + + // also look up domain rules for host name + for (Rule rule : domainRules.get(hostname)) { + if (rule.match(u)) { + return null; + } + } + + // check suffixes of host name from longer to shorter: + // subdomains, domain, top-level domain + int start = 0; + int pos; + while ((pos = hostname.indexOf('.', start)) != -1) { + start = pos + 1; + String domain = hostname.substring(start); + for (Rule rule : domainRules.get(domain)) { + if (rule.match(u)) { + return null; + } + } + } + + // finally check "global" rules defined for `Domain .` + for (Rule rule : domainRules.get(".")) { + if (rule.match(u)) { + return null; + } + } + + // no reject rules found + return urlToFilter; + } } private void reloadRules(Reader rules) throws IOException { - synchronized (this) { - domainRules.clear(); - hostRules.clear(); - - BufferedReader reader = new BufferedReader(rules); - - String current = null; - boolean host = false; - int lineno = 0; - - String line; - try { - while ((line = reader.readLine()) != null) { - lineno++; - line = line.trim(); - - if (line.indexOf("#") != -1) { - // strip comments - line = line.substring(0, line.indexOf("#")).trim(); - } - - if (StringUtils.isBlank(line)) { - continue; - } - - if (line.startsWith("Host")) { - host = true; - current = line.split("\\s+")[1]; - } else if (line.startsWith("Domain")) { - host = false; - current = line.split("\\s+")[1]; - } else { - if (current == null) { - continue; - } - - Rule rule = null; - try { - if (CATCH_ALL_RULE.matcher(line).matches()) { - rule = DenyAllRule.getInstance(); - } else if (line.startsWith("DenyPathQuery")) { - rule = new DenyPathQueryRule(line.split("\\s+")[1]); - } else if (line.startsWith("DenyPath")) { - rule = new DenyPathRule(line.split("\\s+")[1]); - } else { - LOG.warn("Problem reading rule on line {}: {}", lineno, line); - continue; - } - } catch (Exception e) { - LOG.warn("Problem reading rule on line {}: {} - {}", lineno, line, e.getMessage()); - continue; - } - - if (host) { - LOG.trace("Adding host rule [{}] [{}]", current, rule); - hostRules.put(current, rule); - } else { - LOG.trace("Adding domain rule [{}] [{}]", current, rule); - domainRules.put(current, rule); - } - } - } - - } catch (IOException e) { - LOG.warn("Caught exception while reading rules file at line {}: {}", lineno, e.getMessage()); - throw e; - } - } + synchronized (this) { + domainRules.clear(); + hostRules.clear(); + + BufferedReader reader = new BufferedReader(rules); + + String current = null; + boolean host = false; + int lineno = 0; + + String line; + try { + while ((line = reader.readLine()) != null) { + lineno++; + line = line.trim(); + + if (line.indexOf("#") != -1) { + // strip comments + line = line.substring(0, line.indexOf("#")).trim(); + } + + if (StringUtils.isBlank(line)) { + continue; + } + + if (line.startsWith("Host")) { + host = true; + current = line.split("\\s+")[1]; + } else if (line.startsWith("Domain")) { + host = false; + current = line.split("\\s+")[1]; + } else { + if (current == null) { + continue; + } + + Rule rule = null; + try { + if (CATCH_ALL_RULE.matcher(line).matches()) { + rule = DenyAllRule.getInstance(); + } else if (line.startsWith("DenyPathQuery")) { + rule = new DenyPathQueryRule(line.split("\\s+")[1]); + } else if (line.startsWith("DenyPath")) { + rule = new DenyPathRule(line.split("\\s+")[1]); + } else { + LOG.warn("Problem reading rule on line {}: {}", lineno, line); + continue; + } + } catch (Exception e) { + LOG.warn( + "Problem reading rule on line {}: {} - {}", + lineno, + line, + e.getMessage()); + continue; + } + + if (host) { + LOG.trace("Adding host rule [{}] [{}]", current, rule); + hostRules.put(current, rule); + } else { + LOG.trace("Adding domain rule [{}] [{}]", current, rule); + domainRules.put(current, rule); + } + } + } + + } catch (IOException e) { + LOG.warn( + "Caught exception while reading rules file at line {}: {}", + lineno, + e.getMessage()); + throw e; + } + } } public static class Rule { - protected Pattern pattern; + protected Pattern pattern; - Rule() { - } + Rule() {} - public Rule(String regex) { - pattern = Pattern.compile(regex); - } + public Rule(String regex) { + pattern = Pattern.compile(regex); + } - public boolean match(URL url) { - return pattern.matcher(url.toString()).find(); - } + public boolean match(URL url) { + return pattern.matcher(url.toString()).find(); + } - public String toString() { - return pattern.toString(); - } + public String toString() { + return pattern.toString(); + } } public static class DenyPathRule extends Rule { - public DenyPathRule(String regex) { - super(regex); - } - - public boolean match(URL url) { - String haystack = url.getPath(); - return pattern.matcher(haystack).find(); - } + public DenyPathRule(String regex) { + super(regex); + } + + public boolean match(URL url) { + String haystack = url.getPath(); + return pattern.matcher(haystack).find(); + } } /** Rule for DenyPath .* or DenyPath .? */ public static class DenyAllRule extends Rule { - private static Rule instance = new DenyAllRule("."); + private static Rule instance = new DenyAllRule("."); - private DenyAllRule(String regex) { - super(regex); - } + private DenyAllRule(String regex) { + super(regex); + } - public static Rule getInstance() { - return instance; - } + public static Rule getInstance() { + return instance; + } - public boolean match(URL url) { - return true; - } + public boolean match(URL url) { + return true; + } } public static class DenyPathQueryRule extends Rule { - public DenyPathQueryRule(String regex) { - super(regex); - } - - public boolean match(URL url) { - String haystack = url.getFile(); - return pattern.matcher(haystack).find(); - } + public DenyPathQueryRule(String regex) { + super(regex); + } + + public boolean match(URL url) { + String haystack = url.getFile(); + return pattern.matcher(haystack).find(); + } } } diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/ContentDetector.java b/src/main/java/org/commoncrawl/stormcrawler/news/ContentDetector.java index 1c3a4a5..9979781 100644 --- a/src/main/java/org/commoncrawl/stormcrawler/news/ContentDetector.java +++ b/src/main/java/org/commoncrawl/stormcrawler/news/ContentDetector.java @@ -13,33 +13,28 @@ */ package org.commoncrawl.stormcrawler.news; +import com.google.common.primitives.Bytes; import java.nio.charset.StandardCharsets; import java.util.Arrays; -import com.google.common.primitives.Bytes; - public class ContentDetector { protected byte[][][] clues; protected int maxOffset; /** - * Set up detector to detect content sniffing for a set of clue strings in a - * prefix of the binary content. + * Set up detector to detect content sniffing for a set of clue strings in a prefix of the + * binary content. * - * @param clues - * nested list of literal clues. Outer list defines an OR-group, - * inner list contained ANDed clues required to match all, e.g. - * the following definition would match if either - * "clue1" and "and_clue2" are matched, or - * alternatively "or_clue3" is found + * @param clues nested list of literal clues. Outer list defines an OR-group, inner list + * contained ANDed clues required to match all, e.g. the following definition would match if + * either "clue1" and "and_clue2" are matched, or alternatively + * "or_clue3" is found + *

+     *                  { { clue1, and_clue2 }, { or_clue3 } }
+     *                  
* - *
-     *            { { clue1, and_clue2 }, { or_clue3 } }
-     *            
- * - * @param maxOffset - * max. offset of content prefix checked for clues + * @param maxOffset max. offset of content prefix checked for clues */ public ContentDetector(String[][] clues, int maxOffset) { this.maxOffset = maxOffset; @@ -60,8 +55,7 @@ public int getFirstMatch(byte[] content) { for (int i = 0; i < clues.length; i++) { byte[][] group = clues[i]; for (byte[] clue : group) { - if (Bytes.indexOf(beginning, clue) == -1) - continue OR; + if (Bytes.indexOf(beginning, clue) == -1) continue OR; } // success, all members of one group matched return i; @@ -72,5 +66,4 @@ public int getFirstMatch(byte[] content) { public boolean matches(byte[] content) { return (getFirstMatch(content) >= 0); } - } diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java b/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java index 3a3c018..cc2d85a 100644 --- a/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java +++ b/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to DigitalPebble Ltd under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -14,13 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.commoncrawl.stormcrawler.news; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; - import org.apache.storm.topology.BoltDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; @@ -43,116 +41,126 @@ import org.apache.stormcrawler.warc.WARCHdfsBolt; import org.slf4j.LoggerFactory; -/** - * Dummy topology to play with the spouts and bolts on OpenSearch - */ +/** Dummy topology to play with the spouts and bolts on OpenSearch */ public class CrawlTopology extends ConfigurableTopology { private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(CrawlTopology.class); public static void main(String[] args) throws Exception { - ConfigurableTopology.start(new CrawlTopology(), args); + ConfigurableTopology.start(new CrawlTopology(), args); } @Override protected int run(String[] args) { - TopologyBuilder builder = new TopologyBuilder(); + TopologyBuilder builder = new TopologyBuilder(); - int numWorkers = ConfUtils.getInt(getConf(), "topology.workers", 1); + int numWorkers = ConfUtils.getInt(getConf(), "topology.workers", 1); - // set to the real number of shards ONLY if es.status.routing is set to - // true in the configuration - int numShards = 16; + // set to the real number of shards ONLY if es.status.routing is set to + // true in the configuration + int numShards = 16; - if (args.length >= 2) { - // arguments include seed directory and file pattern - LOG.info("Injecting seeds from {} by pattern {}", args[0], args[1]); - builder.setSpout("filespout", new FileSpout(args[0], args[1], true)); - Fields key = new Fields("url"); + if (args.length >= 2) { + // arguments include seed directory and file pattern + LOG.info("Injecting seeds from {} by pattern {}", args[0], args[1]); + builder.setSpout("filespout", new FileSpout(args[0], args[1], true)); + Fields key = new Fields("url"); - builder.setBolt("filter", new URLFilterBolt()).fieldsGrouping("filespout", Constants.StatusStreamName, key); - } + builder.setBolt("filter", new URLFilterBolt()) + .fieldsGrouping("filespout", Constants.StatusStreamName, key); + } - builder.setSpout("spout", new AggregationSpout(), numShards); + builder.setSpout("spout", new AggregationSpout(), numShards); - builder.setBolt("prefilter", new PreFilterBolt("pre-urlfilters.json"), numWorkers).shuffleGrouping("spout"); + builder.setBolt("prefilter", new PreFilterBolt("pre-urlfilters.json"), numWorkers) + .shuffleGrouping("spout"); - builder.setBolt("partitioner", new URLPartitionerBolt(), numWorkers).shuffleGrouping("prefilter"); + builder.setBolt("partitioner", new URLPartitionerBolt(), numWorkers) + .shuffleGrouping("prefilter"); - builder.setBolt("fetch", new FetcherBolt(), numWorkers).fieldsGrouping("partitioner", new Fields("key")); + builder.setBolt("fetch", new FetcherBolt(), numWorkers) + .fieldsGrouping("partitioner", new Fields("key")); - builder.setBolt("sitemap", new NewsSiteMapParserBolt(), numWorkers).setNumTasks(2) - .localOrShuffleGrouping("fetch"); + builder.setBolt("sitemap", new NewsSiteMapParserBolt(), numWorkers) + .setNumTasks(2) + .localOrShuffleGrouping("fetch"); - builder.setBolt("feed", new FeedParserBolt(), numWorkers).setNumTasks(4).localOrShuffleGrouping("sitemap"); + builder.setBolt("feed", new FeedParserBolt(), numWorkers) + .setNumTasks(4) + .localOrShuffleGrouping("sitemap"); - // don't need to parse the pages but need to update their status - builder.setBolt("ssb", new DummyIndexer(), numWorkers).localOrShuffleGrouping("feed"); + // don't need to parse the pages but need to update their status + builder.setBolt("ssb", new DummyIndexer(), numWorkers).localOrShuffleGrouping("feed"); - WARCHdfsBolt warcbolt = getWarcBolt("CC-NEWS"); + WARCHdfsBolt warcbolt = getWarcBolt("CC-NEWS"); - // take it from feed default output so that the feed files themselves - // don't get included - unless we want them too of course! - builder.setBolt("warc", warcbolt, numWorkers).localOrShuffleGrouping("feed"); - - final Fields furl = new Fields("url"); + // take it from feed default output so that the feed files themselves + // don't get included - unless we want them too of course! + builder.setBolt("warc", warcbolt, numWorkers).localOrShuffleGrouping("feed"); - BoltDeclarer statusBolt = builder.setBolt("status", new StatusUpdaterBolt(), numWorkers) - .fieldsGrouping("fetch", Constants.StatusStreamName, furl) - .fieldsGrouping("sitemap", Constants.StatusStreamName, furl) - .fieldsGrouping("feed", Constants.StatusStreamName, furl) - .fieldsGrouping("ssb", Constants.StatusStreamName, furl) - .fieldsGrouping("prefilter", Constants.StatusStreamName, furl); - - if (args.length >= 2) { - statusBolt.customGrouping("filter", Constants.StatusStreamName, new URLStreamGrouping()); - } - statusBolt.setNumTasks(numShards); + final Fields furl = new Fields("url"); - return submit(conf, builder); + BoltDeclarer statusBolt = + builder.setBolt("status", new StatusUpdaterBolt(), numWorkers) + .fieldsGrouping("fetch", Constants.StatusStreamName, furl) + .fieldsGrouping("sitemap", Constants.StatusStreamName, furl) + .fieldsGrouping("feed", Constants.StatusStreamName, furl) + .fieldsGrouping("ssb", Constants.StatusStreamName, furl) + .fieldsGrouping("prefilter", Constants.StatusStreamName, furl); + + if (args.length >= 2) { + statusBolt.customGrouping( + "filter", Constants.StatusStreamName, new URLStreamGrouping()); + } + statusBolt.setNumTasks(numShards); + + return submit(conf, builder); } protected WARCHdfsBolt getWarcBolt(String filePrefix) { - // path is absolute - String warcFilePath = ConfUtils.getString(getConf(), "warc.dir", "/data/warc"); - - WARCFileNameFormat fileNameFormat = new WARCFileNameFormat(); - fileNameFormat.withPath(warcFilePath); - fileNameFormat.withPrefix(filePrefix); - - Map fields = new LinkedHashMap<>(); - fields.put("software", "StormCrawler 2.10 https://stormcrawler.net/"); - fields.put("description", "News crawl for Common Crawl"); - String userAgent = AbstractHttpProtocol.getAgentString(getConf()); - fields.put("http-header-user-agent", userAgent); - fields.put("http-header-from", ConfUtils.getString(getConf(), "http.agent.email")); - String robotsTxtParser = "checked by crawler-commons " + crawlercommons.CrawlerCommons.getVersion() - + " (https://github.com/crawler-commons/crawler-commons)"; - fields.put("robots", robotsTxtParser); - fields.put("format", "WARC File Format 1.1"); - fields.put("conformsTo", "https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.1/"); - - WARCHdfsBolt warcbolt = (WARCHdfsBolt) new WARCHdfsBolt(); - warcbolt.withConfigKey("warc"); - warcbolt.withFileNameFormat(fileNameFormat); - warcbolt.withHeader(fields); - warcbolt.withRequestRecords(); - - // use RawLocalFileSystem (instead of ChecksumFileSystem) to avoid that - // WARC files are truncated if the topology is stopped because of a - // delayed sync of the default ChecksumFileSystem - Map hdfsConf = new HashMap<>(); - hdfsConf.put("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem"); - getConf().put("warc", hdfsConf); - - // will rotate if reaches size or time limit - int maxMB = ConfUtils.getInt(getConf(), "warc.rotation.policy.max-mb", 1024); - int maxMinutes = ConfUtils.getInt(getConf(), "warc.rotation.policy.max-minutes", 1440); - FileTimeSizeRotationPolicy rotpol = new FileTimeSizeRotationPolicy(maxMB, Units.MB); - rotpol.setTimeRotationInterval(maxMinutes, FileTimeSizeRotationPolicy.TimeUnit.MINUTES); - warcbolt.withRotationPolicy(rotpol); - - return warcbolt; + // path is absolute + String warcFilePath = ConfUtils.getString(getConf(), "warc.dir", "/data/warc"); + + WARCFileNameFormat fileNameFormat = new WARCFileNameFormat(); + fileNameFormat.withPath(warcFilePath); + fileNameFormat.withPrefix(filePrefix); + + Map fields = new LinkedHashMap<>(); + fields.put("software", "StormCrawler 2.10 https://stormcrawler.net/"); + fields.put("description", "News crawl for Common Crawl"); + String userAgent = AbstractHttpProtocol.getAgentString(getConf()); + fields.put("http-header-user-agent", userAgent); + fields.put("http-header-from", ConfUtils.getString(getConf(), "http.agent.email")); + String robotsTxtParser = + "checked by crawler-commons " + + crawlercommons.CrawlerCommons.getVersion() + + " (https://github.com/crawler-commons/crawler-commons)"; + fields.put("robots", robotsTxtParser); + fields.put("format", "WARC File Format 1.1"); + fields.put( + "conformsTo", + "https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.1/"); + + WARCHdfsBolt warcbolt = (WARCHdfsBolt) new WARCHdfsBolt(); + warcbolt.withConfigKey("warc"); + warcbolt.withFileNameFormat(fileNameFormat); + warcbolt.withHeader(fields); + warcbolt.withRequestRecords(); + + // use RawLocalFileSystem (instead of ChecksumFileSystem) to avoid that + // WARC files are truncated if the topology is stopped because of a + // delayed sync of the default ChecksumFileSystem + Map hdfsConf = new HashMap<>(); + hdfsConf.put("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem"); + getConf().put("warc", hdfsConf); + + // will rotate if reaches size or time limit + int maxMB = ConfUtils.getInt(getConf(), "warc.rotation.policy.max-mb", 1024); + int maxMinutes = ConfUtils.getInt(getConf(), "warc.rotation.policy.max-minutes", 1440); + FileTimeSizeRotationPolicy rotpol = new FileTimeSizeRotationPolicy(maxMB, Units.MB); + rotpol.setTimeRotationInterval(maxMinutes, FileTimeSizeRotationPolicy.TimeUnit.MINUTES); + warcbolt.withRotationPolicy(rotpol); + + return warcbolt; } - } diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/FeedDetectorBolt.java b/src/main/java/org/commoncrawl/stormcrawler/news/FeedDetectorBolt.java index c365525..e607dc2 100644 --- a/src/main/java/org/commoncrawl/stormcrawler/news/FeedDetectorBolt.java +++ b/src/main/java/org/commoncrawl/stormcrawler/news/FeedDetectorBolt.java @@ -14,13 +14,11 @@ package org.commoncrawl.stormcrawler.news; import java.util.Map; - +import org.apache.http.HttpHeaders; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; -import org.slf4j.LoggerFactory; - import org.apache.stormcrawler.Constants; import org.apache.stormcrawler.Metadata; import org.apache.stormcrawler.bolt.FeedParserBolt; @@ -29,28 +27,23 @@ import org.apache.stormcrawler.parse.ParseFilters; import org.apache.stormcrawler.parse.ParseResult; import org.apache.stormcrawler.persistence.Status; -import org.apache.http.HttpHeaders; +import org.slf4j.LoggerFactory; /** Detect RSS and Atom feeds, but do not parse and extract links */ @SuppressWarnings("serial") public class FeedDetectorBolt extends FeedParserBolt { - private static final org.slf4j.Logger LOG = LoggerFactory - .getLogger(FeedDetectorBolt.class); + private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(FeedDetectorBolt.class); - public static final String[] mimeTypeClues = { - "rss+xml", "atom+xml", "text/rss" - }; + public static final String[] mimeTypeClues = {"rss+xml", "atom+xml", "text/rss"}; - public static String[][] contentClues = { { "<{}> for {}", - ct, url); + LOG.info("Feed detected from content type <{}> for {}", ct, url); break; } } @@ -90,8 +82,8 @@ public void execute(Tuple tuple) { parseData.setMetadata(metadata); parseFilters.filter(url, content, null, parse); // emit status - collector.emit(Constants.StatusStreamName, tuple, - new Values(url, metadata, Status.FETCHED)); + collector.emit( + Constants.StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED)); } else { // pass on collector.emit(tuple, tuple.getValues()); @@ -100,11 +92,9 @@ public void execute(Tuple tuple) { } @Override - @SuppressWarnings({ "rawtypes" }) - public void prepare(Map stormConf, TopologyContext context, - OutputCollector collect) { + @SuppressWarnings({"rawtypes"}) + public void prepare(Map stormConf, TopologyContext context, OutputCollector collect) { super.prepare(stormConf, context, collect); parseFilters = ParseFilters.fromConf(stormConf); } - } diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/NewsSiteMapParserBolt.java b/src/main/java/org/commoncrawl/stormcrawler/news/NewsSiteMapParserBolt.java index 3c4cf55..187d086 100644 --- a/src/main/java/org/commoncrawl/stormcrawler/news/NewsSiteMapParserBolt.java +++ b/src/main/java/org/commoncrawl/stormcrawler/news/NewsSiteMapParserBolt.java @@ -15,6 +15,18 @@ import static org.apache.stormcrawler.Constants.StatusStreamName; +import crawlercommons.sitemaps.AbstractSiteMap; +import crawlercommons.sitemaps.Namespace; +import crawlercommons.sitemaps.SiteMap; +import crawlercommons.sitemaps.SiteMapIndex; +import crawlercommons.sitemaps.SiteMapParser; +import crawlercommons.sitemaps.SiteMapURL; +import crawlercommons.sitemaps.SiteMapURL.ChangeFrequency; +import crawlercommons.sitemaps.UnknownFormatException; +import crawlercommons.sitemaps.extension.Extension; +import crawlercommons.sitemaps.extension.ExtensionMetadata; +import crawlercommons.sitemaps.extension.LinkAttributes; +import crawlercommons.sitemaps.extension.NewsAttributes; import java.io.IOException; import java.net.URL; import java.util.ArrayList; @@ -24,7 +36,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; - import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpHeaders; import org.apache.storm.metric.api.MeanReducer; @@ -46,56 +57,45 @@ import org.apache.stormcrawler.util.ConfUtils; import org.slf4j.LoggerFactory; -import crawlercommons.sitemaps.AbstractSiteMap; -import crawlercommons.sitemaps.Namespace; -import crawlercommons.sitemaps.SiteMap; -import crawlercommons.sitemaps.SiteMapIndex; -import crawlercommons.sitemaps.SiteMapParser; -import crawlercommons.sitemaps.SiteMapURL; -import crawlercommons.sitemaps.SiteMapURL.ChangeFrequency; -import crawlercommons.sitemaps.UnknownFormatException; -import crawlercommons.sitemaps.extension.Extension; -import crawlercommons.sitemaps.extension.ExtensionMetadata; -import crawlercommons.sitemaps.extension.LinkAttributes; -import crawlercommons.sitemaps.extension.NewsAttributes; - - /** - * ParserBolt for news + * ParserBolt for news * sitemaps. */ @SuppressWarnings("serial") public class NewsSiteMapParserBolt extends SiteMapParserBolt { // TODO: - // this is a modified copy of c.d.s.bolt.SiteMapParserBolt - // - make parent class extensible and overridable - // modifications: - // - detect and process only Google news sitemaps - // - or a sitemapindex because some subsitemaps may - // be news sitemaps - // - pass "isSitemapNews" to status metadata + // this is a modified copy of c.d.s.bolt.SiteMapParserBolt + // - make parent class extensible and overridable + // modifications: + // - detect and process only Google news sitemaps + // - or a sitemapindex because some subsitemaps may + // be news sitemaps + // - pass "isSitemapNews" to status metadata public static enum SitemapType { - NEWS, INDEX, SITEMAP + NEWS, + INDEX, + SITEMAP } public static final String isSitemapNewsKey = "isSitemapNews"; public static final String isSitemapIndexKey = "isSitemapIndex"; + /** - * A sitemap (not necessarily a news sitemap) which is verified to contain - * links to news articles. Necessary to crawl news sites which provide a - * sitemap but neither a news feed or sitemap. + * A sitemap (not necessarily a news sitemap) which is verified to contain links to news + * articles. Necessary to crawl news sites which provide a sitemap but neither a news feed or + * sitemap. */ public static final String isSitemapVerifiedKey = "isSitemapVerified"; - private static final org.slf4j.Logger LOG = LoggerFactory - .getLogger(NewsSiteMapParserBolt.class); + private static final org.slf4j.Logger LOG = + LoggerFactory.getLogger(NewsSiteMapParserBolt.class); /* content clues for news sitemaps, sitemap indexes or any sitemaps */ public static String[][] contentClues; public static int contentCluesSitemapNewsMatchUpTo = -1; public static int contentCluesSitemapIndexMatchUpTo = -1; + static { int cluesSize = Namespace.NEWS.length + 1 + 1 + Namespace.SITEMAP_LEGACY.length; contentClues = new String[cluesSize][1]; @@ -129,7 +129,7 @@ public static enum SitemapType { private ReducedMetric averagedMetrics; - /** Delay in minutes used for scheduling sub-sitemaps **/ + /** Delay in minutes used for scheduling sub-sitemaps * */ private int scheduleSitemapsWithDelay = -1; @Override @@ -140,14 +140,10 @@ public void execute(Tuple tuple) { byte[] content = tuple.getBinaryByField("content"); String url = tuple.getStringByField("url"); - boolean isSitemap = Boolean.valueOf( - metadata.getFirstValue(SiteMapParserBolt.isSitemapKey)); - boolean isNewsSitemap = Boolean - .valueOf(metadata.getFirstValue(isSitemapNewsKey)); - boolean isSitemapIndex = Boolean - .valueOf(metadata.getFirstValue(isSitemapIndexKey)); - boolean isSitemapVerified = Boolean - .valueOf(metadata.getFirstValue(isSitemapVerifiedKey)); + boolean isSitemap = Boolean.valueOf(metadata.getFirstValue(SiteMapParserBolt.isSitemapKey)); + boolean isNewsSitemap = Boolean.valueOf(metadata.getFirstValue(isSitemapNewsKey)); + boolean isSitemapIndex = Boolean.valueOf(metadata.getFirstValue(isSitemapIndexKey)); + boolean isSitemapVerified = Boolean.valueOf(metadata.getFirstValue(isSitemapVerifiedKey)); if (sniffContent) { SitemapType type = detectContent(url, content); @@ -183,14 +179,16 @@ public void execute(Tuple tuple) { if (isNewsSitemap || isSitemapIndex || isSitemapVerified) { /* - * remove the isSitemap key from metadata to avoid that the default - * sitemap fetch interval is applied to news sitemaps, sitemap - * indexes and verified sitemaps + * remove the isSitemap key from metadata to avoid that the default sitemap + * fetch interval is applied to news sitemaps, sitemap indexes and verified + * sitemaps */ metadata.remove(isSitemapKey); } else { if (isSitemap) { - collector.emit(Constants.StatusStreamName, tuple, + collector.emit( + Constants.StatusStreamName, + tuple, new Values(url, metadata, Status.FETCHED)); } else { // not a sitemap, just pass it on @@ -217,8 +215,8 @@ public void execute(Tuple tuple) { metadata.setValue(Constants.STATUS_ERROR_SOURCE, "sitemap parsing"); metadata.setValue(Constants.STATUS_ERROR_MESSAGE, errorMessage); metadata.remove("numLinks"); - collector.emit(Constants.StatusStreamName, tuple, new Values(url, - metadata, Status.ERROR)); + collector.emit( + Constants.StatusStreamName, tuple, new Values(url, metadata, Status.ERROR)); collector.ack(tuple); return; } @@ -232,15 +230,12 @@ public void execute(Tuple tuple) { parseFilters.filter(url, content, null, parse); } catch (RuntimeException e) { - String errorMessage = "Exception while running parse filters on " - + url + ": " + e; + String errorMessage = "Exception while running parse filters on " + url + ": " + e; LOG.error(errorMessage); - metadata.setValue(Constants.STATUS_ERROR_SOURCE, - "content filtering"); + metadata.setValue(Constants.STATUS_ERROR_SOURCE, "content filtering"); metadata.setValue(Constants.STATUS_ERROR_MESSAGE, errorMessage); metadata.remove("numLinks"); - collector.emit(StatusStreamName, tuple, new Values(url, metadata, - Status.ERROR)); + collector.emit(StatusStreamName, tuple, new Values(url, metadata, Status.ERROR)); collector.ack(tuple); return; } @@ -263,8 +258,7 @@ public void execute(Tuple tuple) { ol.getMetadata().setValue(isSitemapVerifiedKey, "true"); } } - Values v = new Values(ol.getTargetURL(), ol.getMetadata(), - Status.DISCOVERED); + Values v = new Values(ol.getTargetURL(), ol.getMetadata(), Status.DISCOVERED); collector.emit(Constants.StatusStreamName, tuple, v); } @@ -272,8 +266,8 @@ public void execute(Tuple tuple) { metadata.setValue("numLinks", String.valueOf(outlinks.size())); // marking the main URL as successfully fetched - collector.emit(Constants.StatusStreamName, tuple, new Values(url, - metadata, Status.FETCHED)); + collector.emit( + Constants.StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED)); collector.ack(tuple); } @@ -291,12 +285,10 @@ public SitemapType detectContent(String url, byte[] content) { if (match >= 0) { // a sitemap, need to detect type of sitemap if (match <= contentCluesSitemapNewsMatchUpTo) { - LOG.info("{} detected as news sitemap based on content", - url); + LOG.info("{} detected as news sitemap based on content", url); return SitemapType.NEWS; } else if (match <= contentCluesSitemapIndexMatchUpTo) { - LOG.info("{} detected as sitemap index based on content", - url); + LOG.info("{} detected as sitemap index based on content", url); return SitemapType.INDEX; } else { return SitemapType.SITEMAP; @@ -317,12 +309,15 @@ private boolean recentlyModified(Date lastModified) { return true; } - protected AbstractSiteMap parseSiteMap(String url, byte[] content, - String contentType, Metadata parentMetadata, List links) + protected AbstractSiteMap parseSiteMap( + String url, + byte[] content, + String contentType, + Metadata parentMetadata, + List links) throws UnknownFormatException, IOException { - SiteMapParser parser = new SiteMapParser(strictModeSitemaps, - allowPartialSitemaps); + SiteMapParser parser = new SiteMapParser(strictModeSitemaps, allowPartialSitemaps); parser.setStrictNamespace(true); parser.addAcceptedNamespace(Namespace.SITEMAP_LEGACY); parser.addAcceptedNamespace(Namespace.EMPTY); @@ -334,8 +329,7 @@ protected AbstractSiteMap parseSiteMap(String url, byte[] content, long start = System.currentTimeMillis(); AbstractSiteMap siteMap; // let the parser guess what the mimetype is - if (StringUtils.isBlank(contentType) - || contentType.contains("octet-stream")) { + if (StringUtils.isBlank(contentType) || contentType.contains("octet-stream")) { siteMap = parser.parseSiteMap(content, sURL); } else { siteMap = parser.parseSiteMap(contentType, content, sURL); @@ -351,8 +345,8 @@ protected AbstractSiteMap parseSiteMap(String url, byte[] content, Collection subsitemaps = smi.getSitemaps(); int delay = 0; /* - * keep the subsitemaps as outlinks they will be fetched and parsed - * in the following steps + * keep the subsitemaps as outlinks they will be fetched and parsed in the + * following steps */ Iterator iter = subsitemaps.iterator(); while (iter.hasNext()) { @@ -365,13 +359,21 @@ protected AbstractSiteMap parseSiteMap(String url, byte[] content, linksSkippedNotRecentlyModified++; LOG.debug( "{} has a modified date {} which is more than {} hours old", - target, lastModified.toString(), + target, + lastModified.toString(), filterHoursSinceModified); continue; } - Outlink ol = filterOutlink(sURL, target, parentMetadata, - isSitemapKey, "true", isSitemapNewsKey, "false"); + Outlink ol = + filterOutlink( + sURL, + target, + parentMetadata, + isSitemapKey, + "true", + isSitemapNewsKey, + "false"); if (ol == null) { continue; } @@ -379,9 +381,8 @@ protected AbstractSiteMap parseSiteMap(String url, byte[] content, // add a delay if (this.scheduleSitemapsWithDelay > 0) { if (delay > 0) { - ol.getMetadata().setValue( - DefaultScheduler.DELAY_METADATA, - Integer.toString(delay)); + ol.getMetadata() + .setValue(DefaultScheduler.DELAY_METADATA, Integer.toString(delay)); } delay += this.scheduleSitemapsWithDelay; } @@ -389,15 +390,19 @@ protected AbstractSiteMap parseSiteMap(String url, byte[] content, links.add(ol); LOG.debug("{} : [sitemap] {}", url, target); } - LOG.info("Sitemap index (found {} sitemaps, {} skipped): {}", - linksFound, linksSkippedNotRecentlyModified, url); + LOG.info( + "Sitemap index (found {} sitemaps, {} skipped): {}", + linksFound, + linksSkippedNotRecentlyModified, + url); } // sitemap files else { SiteMap sm = (SiteMap) siteMap; Collection sitemapURLs = sm.getSiteMapUrls(); Iterator iter = sitemapURLs.iterator(); - sitemap_urls: while (iter.hasNext()) { + sitemap_urls: + while (iter.hasNext()) { linksFound++; SiteMapURL smurl = iter.next(); // TODO handle priority in metadata @@ -414,11 +419,12 @@ protected AbstractSiteMap parseSiteMap(String url, byte[] content, linksSkippedNotRecentlyModified++; LOG.debug( "{} has a modified date {} which is more than {} hours old", - target, lastModified, filterHoursSinceModified); + target, + lastModified, + filterHoursSinceModified); continue; } - ExtensionMetadata[] newsAttrs = smurl - .getAttributesForExtension(Extension.NEWS); + ExtensionMetadata[] newsAttrs = smurl.getAttributesForExtension(Extension.NEWS); if (newsAttrs != null) { // filter based on news publication date // 2008-12-23 @@ -429,7 +435,9 @@ protected AbstractSiteMap parseSiteMap(String url, byte[] content, linksSkippedNotRecentlyModified++; LOG.debug( "{} has a news publication date {} which is more than {} hours old", - target, pubDate, filterHoursSinceModified); + target, + pubDate, + filterHoursSinceModified); continue sitemap_urls; } } @@ -437,8 +445,7 @@ protected AbstractSiteMap parseSiteMap(String url, byte[] content, } // add alternative language links - ExtensionMetadata[] linkAttrs = smurl - .getAttributesForExtension(Extension.LINKS); + ExtensionMetadata[] linkAttrs = smurl.getAttributesForExtension(Extension.LINKS); if (linkAttrs != null) { for (ExtensionMetadata attr : linkAttrs) { LinkAttributes linkAttr = (LinkAttributes) attr; @@ -451,17 +458,30 @@ protected AbstractSiteMap parseSiteMap(String url, byte[] content, // skip href links duplicating sitemap URL continue; } - Outlink ol = filterOutlink(sURL, hrefStr, - parentMetadata, isSitemapKey, "false", - isSitemapNewsKey, "false"); + Outlink ol = + filterOutlink( + sURL, + hrefStr, + parentMetadata, + isSitemapKey, + "false", + isSitemapNewsKey, + "false"); if (ol != null) { links.add(ol); } } } - Outlink ol = filterOutlink(sURL, target, parentMetadata, - isSitemapKey, "false", isSitemapNewsKey, "false"); + Outlink ol = + filterOutlink( + sURL, + target, + parentMetadata, + isSitemapKey, + "false", + isSitemapNewsKey, + "false"); if (ol == null) { continue; } @@ -469,34 +489,33 @@ protected AbstractSiteMap parseSiteMap(String url, byte[] content, links.add(ol); LOG.debug("{} : [sitemap] {}", url, target); } - LOG.info("Sitemap (found {} links, {} skipped): {}", linksFound, - linksSkippedNotRecentlyModified, url); + LOG.info( + "Sitemap (found {} links, {} skipped): {}", + linksFound, + linksSkippedNotRecentlyModified, + url); } return siteMap; } @Override - @SuppressWarnings({ "rawtypes", "unchecked" }) - public void prepare(Map stormConf, TopologyContext context, - OutputCollector collector) { + @SuppressWarnings({"rawtypes", "unchecked"}) + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { super.prepare(stormConf, context, collector); - sniffContent = ConfUtils.getBoolean(stormConf, - "sitemap.sniffContent", false); - filterHoursSinceModified = ConfUtils.getInt(stormConf, - "sitemap.filter.hours.since.modified", -1); + sniffContent = ConfUtils.getBoolean(stormConf, "sitemap.sniffContent", false); + filterHoursSinceModified = + ConfUtils.getInt(stormConf, "sitemap.filter.hours.since.modified", -1); parseFilters = ParseFilters.fromConf(stormConf); - int maxOffsetGuess = ConfUtils.getInt(stormConf, "sitemap.offset.guess", - 1024); - contentDetector = new ContentDetector( - NewsSiteMapParserBolt.contentClues, maxOffsetGuess); - rssContentDetector = new ContentDetector( - FeedDetectorBolt.contentClues, maxOffsetGuess); - averagedMetrics = context.registerMetric( - "news_sitemap_average_processing_time", - new ReducedMetric(new MeanReducer()), 30); - scheduleSitemapsWithDelay = ConfUtils.getInt(stormConf, - "sitemap.schedule.delay", scheduleSitemapsWithDelay); + int maxOffsetGuess = ConfUtils.getInt(stormConf, "sitemap.offset.guess", 1024); + contentDetector = new ContentDetector(NewsSiteMapParserBolt.contentClues, maxOffsetGuess); + rssContentDetector = new ContentDetector(FeedDetectorBolt.contentClues, maxOffsetGuess); + averagedMetrics = + context.registerMetric( + "news_sitemap_average_processing_time", + new ReducedMetric(new MeanReducer()), + 30); + scheduleSitemapsWithDelay = + ConfUtils.getInt(stormConf, "sitemap.schedule.delay", scheduleSitemapsWithDelay); } - } diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/PreFilterBolt.java b/src/main/java/org/commoncrawl/stormcrawler/news/PreFilterBolt.java index b986506..18106c3 100644 --- a/src/main/java/org/commoncrawl/stormcrawler/news/PreFilterBolt.java +++ b/src/main/java/org/commoncrawl/stormcrawler/news/PreFilterBolt.java @@ -1,9 +1,22 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.commoncrawl.stormcrawler.news; import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.Map; - import org.apache.commons.lang3.StringUtils; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; @@ -19,68 +32,67 @@ import org.slf4j.LoggerFactory; /** - * Variant of the URLFilterBolt to go upstream of the fetching to catch anything - * before it goes further into the topology. If filtered, a URL gets an ERROR - * status. + * Variant of the URLFilterBolt to go upstream of the fetching to catch anything before it goes + * further into the topology. If filtered, a URL gets an ERROR status. */ public class PreFilterBolt extends BaseRichBolt { - protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - - private URLFilters urlFilters; - - protected OutputCollector collector; - - private final String filterConfigFile; - - private static final String _s = org.apache.stormcrawler.Constants.StatusStreamName; - - public PreFilterBolt(String filterConfigFile) { - this.filterConfigFile = filterConfigFile; - } - - @Override - public void execute(Tuple input) { - - // must have at least a URL and metadata - String urlString = input.getStringByField("url"); - Metadata metadata = (Metadata) input.getValueByField("metadata"); - - String filtered = urlFilters.filter(null, null, urlString); - if (StringUtils.isBlank(filtered)) { - LOG.debug("URL rejected: {}", urlString); - // emit with an error to the status stream - metadata.addValue("error.cause", "Filtered"); - Values v = new Values(urlString, metadata, Status.ERROR); - collector.emit(_s, input, v); - collector.ack(input); - return; - } - - // pass to std out - Values v = new Values(urlString, metadata); - collector.emit(input, v); - collector.ack(input); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declareStream(_s, new Fields("url", "metadata", "status")); - declarer.declare(new Fields("url", "metadata")); - } - - @Override - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - this.collector = collector; - if (filterConfigFile != null) { - try { - urlFilters = new URLFilters(stormConf, filterConfigFile); - } catch (IOException e) { - throw new RuntimeException("Can't load filters from " + filterConfigFile); - } - } else { - urlFilters = URLFilters.fromConf(stormConf); - } - } - + protected static final Logger LOG = + LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private URLFilters urlFilters; + + protected OutputCollector collector; + + private final String filterConfigFile; + + private static final String _s = org.apache.stormcrawler.Constants.StatusStreamName; + + public PreFilterBolt(String filterConfigFile) { + this.filterConfigFile = filterConfigFile; + } + + @Override + public void execute(Tuple input) { + + // must have at least a URL and metadata + String urlString = input.getStringByField("url"); + Metadata metadata = (Metadata) input.getValueByField("metadata"); + + String filtered = urlFilters.filter(null, null, urlString); + if (StringUtils.isBlank(filtered)) { + LOG.debug("URL rejected: {}", urlString); + // emit with an error to the status stream + metadata.addValue("error.cause", "Filtered"); + Values v = new Values(urlString, metadata, Status.ERROR); + collector.emit(_s, input, v); + collector.ack(input); + return; + } + + // pass to std out + Values v = new Values(urlString, metadata); + collector.emit(input, v); + collector.ack(input); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declareStream(_s, new Fields("url", "metadata", "status")); + declarer.declare(new Fields("url", "metadata")); + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + if (filterConfigFile != null) { + try { + urlFilters = new URLFilters(stormConf, filterConfigFile); + } catch (IOException e) { + throw new RuntimeException("Can't load filters from " + filterConfigFile); + } + } else { + urlFilters = URLFilters.fromConf(stormConf); + } + } } diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/PunycodeURLNormalizer.java b/src/main/java/org/commoncrawl/stormcrawler/news/PunycodeURLNormalizer.java index 4adf03d..114477c 100644 --- a/src/main/java/org/commoncrawl/stormcrawler/news/PunycodeURLNormalizer.java +++ b/src/main/java/org/commoncrawl/stormcrawler/news/PunycodeURLNormalizer.java @@ -13,20 +13,18 @@ */ package org.commoncrawl.stormcrawler.news; +import com.fasterxml.jackson.databind.JsonNode; import java.net.IDN; import java.net.MalformedURLException; import java.net.URL; import java.util.Map; - import org.apache.stormcrawler.Metadata; import org.apache.stormcrawler.filtering.URLFilter; -import com.fasterxml.jackson.databind.JsonNode; public class PunycodeURLNormalizer extends URLFilter { @Override - public void configure(Map stormConf, JsonNode filterParams) { - } + public void configure(Map stormConf, JsonNode filterParams) {} private boolean isAscii(String str) { char[] chars = str.toCharArray(); @@ -39,8 +37,7 @@ private boolean isAscii(String str) { } @Override - public String filter(URL sourceUrl, Metadata sourceMetadata, - String urlToFilter) { + public String filter(URL sourceUrl, Metadata sourceMetadata, String urlToFilter) { try { URL url = new URL(urlToFilter); String hostName = url.getHost(); @@ -51,12 +48,11 @@ public String filter(URL sourceUrl, Metadata sourceMetadata, if (hostName.equals(url.getHost())) { return urlToFilter; } - urlToFilter = new URL(url.getProtocol(), hostName, url.getPort(), - url.getFile()).toString(); + urlToFilter = + new URL(url.getProtocol(), hostName, url.getPort(), url.getFile()).toString(); } catch (MalformedURLException e) { return null; } return urlToFilter; } - } diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/BootstrapTopology.java b/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/BootstrapTopology.java index 821551e..14f45ff 100644 --- a/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/BootstrapTopology.java +++ b/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/BootstrapTopology.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to DigitalPebble Ltd under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -19,31 +19,27 @@ import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; -import org.commoncrawl.stormcrawler.news.CrawlTopology; -import org.commoncrawl.stormcrawler.news.FeedDetectorBolt; -import org.slf4j.LoggerFactory; - import org.apache.stormcrawler.ConfigurableTopology; import org.apache.stormcrawler.Constants; import org.apache.stormcrawler.bolt.FetcherBolt; import org.apache.stormcrawler.bolt.JSoupParserBolt; import org.apache.stormcrawler.bolt.URLFilterBolt; import org.apache.stormcrawler.bolt.URLPartitionerBolt; +import org.apache.stormcrawler.indexing.DummyIndexer; import org.apache.stormcrawler.opensearch.persistence.AggregationSpout; import org.apache.stormcrawler.opensearch.persistence.StatusUpdaterBolt; -import org.apache.stormcrawler.indexing.DummyIndexer; import org.apache.stormcrawler.spout.FileSpout; import org.apache.stormcrawler.util.ConfUtils; import org.apache.stormcrawler.util.URLStreamGrouping; import org.apache.stormcrawler.warc.WARCHdfsBolt; +import org.commoncrawl.stormcrawler.news.CrawlTopology; +import org.commoncrawl.stormcrawler.news.FeedDetectorBolt; +import org.slf4j.LoggerFactory; -/** - * Dummy topology to play with the spouts and bolts on ElasticSearch - */ +/** Dummy topology to play with the spouts and bolts on ElasticSearch */ public class BootstrapTopology extends CrawlTopology { - private static final org.slf4j.Logger LOG = LoggerFactory - .getLogger(BootstrapTopology.class); + private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(BootstrapTopology.class); public static void main(String[] args) throws Exception { ConfigurableTopology.start(new BootstrapTopology(), args); @@ -53,11 +49,14 @@ public static void main(String[] args) throws Exception { protected int run(String[] args) { TopologyBuilder builder = new TopologyBuilder(); - LOG.debug("sitemap.sniffContent: {}", + LOG.debug( + "sitemap.sniffContent: {}", ConfUtils.getBoolean(getConf(), "sitemap.sniffContent", false)); - LOG.info("sitemap.sniffContent: {}", + LOG.info( + "sitemap.sniffContent: {}", ConfUtils.getBoolean(getConf(), "sitemap.sniffContent", false)); - LOG.warn("sitemap.sniffContent: {}", + LOG.warn( + "sitemap.sniffContent: {}", ConfUtils.getBoolean(getConf(), "sitemap.sniffContent", false)); int numWorkers = ConfUtils.getInt(getConf(), "topology.workers", 1); @@ -69,12 +68,11 @@ protected int run(String[] args) { if (args.length >= 2) { // arguments include seed directory and file pattern LOG.info("Injecting seeds from {} by pattern {}", args[0], args[1]); - builder.setSpout("filespout", - new FileSpout(args[0], args[1], true)); + builder.setSpout("filespout", new FileSpout(args[0], args[1], true)); Fields key = new Fields("url"); - builder.setBolt("filter", new URLFilterBolt()).fieldsGrouping( - "filespout", Constants.StatusStreamName, key); + builder.setBolt("filter", new URLFilterBolt()) + .fieldsGrouping("filespout", Constants.StatusStreamName, key); } builder.setSpout("spout", new AggregationSpout(), numShards); @@ -91,12 +89,10 @@ protected int run(String[] args) { builder.setBolt("feed", new FeedDetectorBolt(), numWorkers) .localOrShuffleGrouping("sitemap"); - builder.setBolt("parse", new JSoupParserBolt()) - .localOrShuffleGrouping("feed"); + builder.setBolt("parse", new JSoupParserBolt()).localOrShuffleGrouping("feed"); // don't need to parse the pages but need to update their status - builder.setBolt("ssb", new DummyIndexer(), numWorkers) - .localOrShuffleGrouping("parse"); + builder.setBolt("ssb", new DummyIndexer(), numWorkers).localOrShuffleGrouping("parse"); WARCHdfsBolt warcbolt = getWarcBolt("CC-NEWS-BOOTSTRAP"); @@ -109,8 +105,7 @@ protected int run(String[] args) { .localOrShuffleGrouping("parse", Constants.StatusStreamName) .localOrShuffleGrouping("ssb", Constants.StatusStreamName) .setNumTasks(numShards) - .customGrouping("filter", Constants.StatusStreamName, - new URLStreamGrouping()); + .customGrouping("filter", Constants.StatusStreamName, new URLStreamGrouping()); return submit(conf, builder); } diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/FeedLinkParseFilter.java b/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/FeedLinkParseFilter.java index 5707189..bb50291 100644 --- a/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/FeedLinkParseFilter.java +++ b/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/FeedLinkParseFilter.java @@ -14,19 +14,18 @@ package org.commoncrawl.stormcrawler.news.bootstrap; import java.util.ArrayList; - -import org.slf4j.LoggerFactory; -import org.w3c.dom.DocumentFragment; - import org.apache.stormcrawler.bolt.FeedParserBolt; import org.apache.stormcrawler.parse.Outlink; import org.apache.stormcrawler.parse.ParseResult; import org.apache.stormcrawler.parse.filter.LinkParseFilter; +import org.slf4j.LoggerFactory; +import org.w3c.dom.DocumentFragment; /** - * ParseFilter which extracts exclusively RSS links via Xpath, all other links - * are skipped. See {@link LinkParseFilter} how to register and configure in - * parsefilters.json. A configuration snippet: + * ParseFilter which extracts exclusively RSS links via Xpath, all other links are skipped. See + * {@link LinkParseFilter} how to register and configure in parsefilters.json. A configuration + * snippet: + * *
  *     {
  *      "class": "org.commoncrawl.stormcrawler.news.bootstrap.FeedLinkParseFilter",
@@ -41,12 +40,10 @@
  */
 public class FeedLinkParseFilter extends LinkParseFilter {
 
-    private static final org.slf4j.Logger LOG = LoggerFactory
-            .getLogger(FeedLinkParseFilter.class);
+    private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(FeedLinkParseFilter.class);
 
     @Override
-    public void filter(String URL, byte[] content, DocumentFragment doc,
-            ParseResult parse) {
+    public void filter(String URL, byte[] content, DocumentFragment doc, ParseResult parse) {
 
         // skip existing links
         logLinks(parse, URL, "Skipped links");
@@ -60,11 +57,8 @@ public void filter(String URL, byte[] content, DocumentFragment doc,
 
     public static void logLinks(ParseResult parse, String URL, String message) {
         if (LOG.isDebugEnabled() && parse.getOutlinks().size() > 0) {
-            if (!message.isEmpty())
-                LOG.debug("{} for {}:", message, URL);
-            for (Outlink outlink : parse.getOutlinks())
-                LOG.debug(outlink.getTargetURL());
+            if (!message.isEmpty()) LOG.debug("{} for {}:", message, URL);
+            for (Outlink outlink : parse.getOutlinks()) LOG.debug(outlink.getTargetURL());
         }
     }
-
 }
diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/NewsSiteMapDetectorBolt.java b/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/NewsSiteMapDetectorBolt.java
index 1160201..f0af134 100644
--- a/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/NewsSiteMapDetectorBolt.java
+++ b/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/NewsSiteMapDetectorBolt.java
@@ -14,15 +14,10 @@
 package org.commoncrawl.stormcrawler.news.bootstrap;
 
 import java.util.Map;
-
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
-import org.commoncrawl.stormcrawler.news.ContentDetector;
-import org.commoncrawl.stormcrawler.news.NewsSiteMapParserBolt;
-import org.slf4j.LoggerFactory;
-
 import org.apache.stormcrawler.Constants;
 import org.apache.stormcrawler.Metadata;
 import org.apache.stormcrawler.bolt.SiteMapParserBolt;
@@ -31,25 +26,26 @@
 import org.apache.stormcrawler.parse.ParseFilters;
 import org.apache.stormcrawler.parse.ParseResult;
 import org.apache.stormcrawler.persistence.Status;
+import org.commoncrawl.stormcrawler.news.ContentDetector;
+import org.commoncrawl.stormcrawler.news.NewsSiteMapParserBolt;
+import org.slf4j.LoggerFactory;
 
 /**
- * Detector for news
+ * Detector for news
  * sitemaps and also sitemaps.
  */
 @SuppressWarnings("serial")
 public class NewsSiteMapDetectorBolt extends SiteMapParserBolt {
 
-    private static final org.slf4j.Logger LOG = LoggerFactory
-            .getLogger(NewsSiteMapDetectorBolt.class);
+    private static final org.slf4j.Logger LOG =
+            LoggerFactory.getLogger(NewsSiteMapDetectorBolt.class);
 
     protected static final int maxOffsetContentGuess = 1024;
-    private static ContentDetector contentDetector = new ContentDetector(
-            NewsSiteMapParserBolt.contentClues, maxOffsetContentGuess);
+    private static ContentDetector contentDetector =
+            new ContentDetector(NewsSiteMapParserBolt.contentClues, maxOffsetContentGuess);
 
     private ParseFilter parseFilters;
 
-
     @Override
     public void execute(Tuple tuple) {
         Metadata metadata = (Metadata) tuple.getValueByField("metadata");
@@ -57,10 +53,9 @@ public void execute(Tuple tuple) {
         byte[] content = tuple.getBinaryByField("content");
         String url = tuple.getStringByField("url");
 
-        boolean isSitemap = Boolean.valueOf(
-                metadata.getFirstValue(SiteMapParserBolt.isSitemapKey));
-        boolean isNewsSitemap = Boolean.valueOf(
-                metadata.getFirstValue(NewsSiteMapParserBolt.isSitemapNewsKey));
+        boolean isSitemap = Boolean.valueOf(metadata.getFirstValue(SiteMapParserBolt.isSitemapKey));
+        boolean isNewsSitemap =
+                Boolean.valueOf(metadata.getFirstValue(NewsSiteMapParserBolt.isSitemapNewsKey));
 
         if (!isNewsSitemap || !isSitemap) {
             int match = contentDetector.getFirstMatch(content);
@@ -70,10 +65,8 @@ public void execute(Tuple tuple) {
                 metadata.setValue(SiteMapParserBolt.isSitemapKey, "true");
                 if (match <= NewsSiteMapParserBolt.contentCluesSitemapNewsMatchUpTo) {
                     isNewsSitemap = true;
-                    LOG.info("{} detected as news sitemap based on content",
-                            url);
-                    metadata.setValue(NewsSiteMapParserBolt.isSitemapNewsKey,
-                            "true");
+                    LOG.info("{} detected as news sitemap based on content", url);
+                    metadata.setValue(NewsSiteMapParserBolt.isSitemapNewsKey, "true");
                 }
             }
         }
@@ -85,8 +78,8 @@ public void execute(Tuple tuple) {
             parseData.setMetadata(metadata);
             parseFilters.filter(url, content, null, parse);
             // emit status
-            collector.emit(Constants.StatusStreamName, tuple,
-                    new Values(url, metadata, Status.FETCHED));
+            collector.emit(
+                    Constants.StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED));
         } else {
             // pass on
             collector.emit(tuple, tuple.getValues());
@@ -95,11 +88,9 @@ public void execute(Tuple tuple) {
     }
 
     @Override
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    public void prepare(Map stormConf, TopologyContext context,
-            OutputCollector collect) {
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collect) {
         super.prepare(stormConf, context, collect);
         parseFilters = ParseFilters.fromConf(stormConf);
     }
-
 }
diff --git a/src/test/java/org/commoncrawl/stormcrawler/FastURLFilterTest.java b/src/test/java/org/commoncrawl/stormcrawler/FastURLFilterTest.java
index 64df0ba..58aaa04 100644
--- a/src/test/java/org/commoncrawl/stormcrawler/FastURLFilterTest.java
+++ b/src/test/java/org/commoncrawl/stormcrawler/FastURLFilterTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to DigitalPebble Ltd under one or more contributor license agreements. See the NOTICE
  * file distributed with this work for additional information regarding copyright ownership.
  * DigitalPebble licenses this file to You under the Apache License, Version 2.0 (the "License");
@@ -14,74 +14,72 @@
  */
 package org.commoncrawl.stormcrawler;
 
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.HashMap;
 import java.util.Map;
-
+import org.apache.stormcrawler.Metadata;
+import org.apache.stormcrawler.filtering.URLFilter;
 import org.commoncrawl.stormcrawler.filter.FastURLFilter;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import org.apache.stormcrawler.Metadata;
-import org.apache.stormcrawler.filtering.URLFilter;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
 public class FastURLFilterTest {
 
     protected static URLFilter filter;
 
     @BeforeClass
     public static void init() {
-	filter = createFilter("fast-urlfilter.txt");
+        filter = createFilter("fast-urlfilter.txt");
     }
 
     public static FastURLFilter createFilter(String fileName) {
-	ObjectNode filterParams = new ObjectNode(JsonNodeFactory.instance);
-	filterParams.put("file", fileName);
-	FastURLFilter filter = new FastURLFilter();
-	Map conf = new HashMap<>();
-	conf.put("fast.urlfilter.refresh", 10);
-	filter.configure(conf, filterParams);
-	return filter;
+        ObjectNode filterParams = new ObjectNode(JsonNodeFactory.instance);
+        filterParams.put("file", fileName);
+        FastURLFilter filter = new FastURLFilter();
+        Map conf = new HashMap<>();
+        conf.put("fast.urlfilter.refresh", 10);
+        filter.configure(conf, filterParams);
+        return filter;
     }
 
     @Test
     public void testHostFilter() throws MalformedURLException {
-	URL url = new URL("http://may.go.com/image.jpg");
-	Metadata metadata = new Metadata();
-	String filterResult = filter.filter(url, metadata, url.toExternalForm());
-	Assert.assertEquals(url.toString(), filterResult);
-	
-	url = new URL("http://no.go.com/");
-	filterResult = filter.filter(url, metadata, url.toExternalForm());
-	Assert.assertEquals(null, filterResult);
+        URL url = new URL("http://may.go.com/image.jpg");
+        Metadata metadata = new Metadata();
+        String filterResult = filter.filter(url, metadata, url.toExternalForm());
+        Assert.assertEquals(url.toString(), filterResult);
+
+        url = new URL("http://no.go.com/");
+        filterResult = filter.filter(url, metadata, url.toExternalForm());
+        Assert.assertEquals(null, filterResult);
     }
 
     @Test
     public void testDomainNotAllowed() throws MalformedURLException {
-	URL url = new URL("http://domainnotallowed.com/forum/search.php");
-	Metadata metadata = new Metadata();
-	String filterResult = filter.filter(url, metadata, url.toExternalForm());
-	Assert.assertEquals(null, filterResult);
-	
-	url = new URL("http://domainnotallowed.com/");
-	filterResult = filter.filter(url, metadata, url.toExternalForm());
-	Assert.assertEquals(null, filterResult);
-	
-	url = new URL("http://partiallyallowed.com/");
-	filterResult = filter.filter(url, metadata, url.toExternalForm());
-	Assert.assertEquals(url.toString(), filterResult);
-	
-	url = new URL("http://partiallyallowed.com/verbotten");
-	filterResult = filter.filter(url, metadata, url.toExternalForm());
-	Assert.assertEquals(null, filterResult);
+        URL url = new URL("http://domainnotallowed.com/forum/search.php");
+        Metadata metadata = new Metadata();
+        String filterResult = filter.filter(url, metadata, url.toExternalForm());
+        Assert.assertEquals(null, filterResult);
+
+        url = new URL("http://domainnotallowed.com/");
+        filterResult = filter.filter(url, metadata, url.toExternalForm());
+        Assert.assertEquals(null, filterResult);
+
+        url = new URL("http://partiallyallowed.com/");
+        filterResult = filter.filter(url, metadata, url.toExternalForm());
+        Assert.assertEquals(url.toString(), filterResult);
+
+        url = new URL("http://partiallyallowed.com/verbotten");
+        filterResult = filter.filter(url, metadata, url.toExternalForm());
+        Assert.assertEquals(null, filterResult);
 
-	// allowed
-	url = new URL("http://digitalpebble.com/");
-	filterResult = filter.filter(url, metadata, url.toExternalForm());
-	Assert.assertEquals(url.toString(), filterResult);
+        // allowed
+        url = new URL("http://digitalpebble.com/");
+        filterResult = filter.filter(url, metadata, url.toExternalForm());
+        Assert.assertEquals(url.toString(), filterResult);
     }
 }
diff --git a/src/test/java/org/commoncrawl/stormcrawler/news/NewsSiteMapParserTest.java b/src/test/java/org/commoncrawl/stormcrawler/news/NewsSiteMapParserTest.java
index db73d67..b0a0d5a 100644
--- a/src/test/java/org/commoncrawl/stormcrawler/news/NewsSiteMapParserTest.java
+++ b/src/test/java/org/commoncrawl/stormcrawler/news/NewsSiteMapParserTest.java
@@ -16,6 +16,7 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 
+import crawlercommons.sitemaps.UnknownFormatException;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -25,75 +26,83 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.commons.io.IOUtils;
-import org.commoncrawl.stormcrawler.news.NewsSiteMapParserBolt.SitemapType;
-import org.junit.Before;
-import org.junit.Test;
-
 import org.apache.stormcrawler.Metadata;
 import org.apache.stormcrawler.parse.Outlink;
 import org.apache.stormcrawler.parse.ParsingTester;
-
-import crawlercommons.sitemaps.UnknownFormatException;
+import org.commoncrawl.stormcrawler.news.NewsSiteMapParserBolt.SitemapType;
+import org.junit.Before;
+import org.junit.Test;
 
 public class NewsSiteMapParserTest extends ParsingTester {
 
     @Before
     public void setupParserBolt() {
-	setupParserBolt(new NewsSiteMapParserBolt());
-	Map config = new HashMap<>();
-	config.put("sitemap.sniffContent", true);
-	// allow items published during the last week
-	config.put("sitemap.filter.hours.since.modified", 168);
-	prepareParserBolt("test.parsefilters.json", config);
+        setupParserBolt(new NewsSiteMapParserBolt());
+        Map config = new HashMap<>();
+        config.put("sitemap.sniffContent", true);
+        // allow items published during the last week
+        config.put("sitemap.filter.hours.since.modified", 168);
+        prepareParserBolt("test.parsefilters.json", config);
     }
 
     @Test
     public void testSiteMapParser() throws IOException, UnknownFormatException {
-	String url = "https://example.org/sitemap-news.xml";
-	byte[] content = readContent("sitemap-news.xml");
-	String contentType = "";
-	Metadata parentMetadata = new Metadata();
-	List links = new ArrayList<>();
-
-	SitemapType type = ((NewsSiteMapParserBolt) bolt).detectContent(url, content);
-	assertEquals(SitemapType.NEWS, type);
-
-	((NewsSiteMapParserBolt) bolt).parseSiteMap(url, content, contentType, parentMetadata, links);
-
-	// unmodified sitemap:
-	// - publication date is far in the past, link should be skipped
-	// 2008-12-23
-	assertEquals("Outdated link not skipped", 0, links.size());
-
-	// now set the publication date to yesterday
-	LocalDateTime yesterday = LocalDateTime.now().minusDays(1);
-	content = (new String(content, StandardCharsets.UTF_8))
-		.replace("2008-12-23", ""
-			+ yesterday.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")) + "")
-		.getBytes(StandardCharsets.UTF_8);
-	((NewsSiteMapParserBolt) bolt).parseSiteMap(url, content, contentType, parentMetadata, links);
-
-	assertEquals("Expected one  and one additional  link - image links are ignored", 2,
-		links.size());
+        String url = "https://example.org/sitemap-news.xml";
+        byte[] content = readContent("sitemap-news.xml");
+        String contentType = "";
+        Metadata parentMetadata = new Metadata();
+        List links = new ArrayList<>();
+
+        SitemapType type = ((NewsSiteMapParserBolt) bolt).detectContent(url, content);
+        assertEquals(SitemapType.NEWS, type);
+
+        ((NewsSiteMapParserBolt) bolt)
+                .parseSiteMap(url, content, contentType, parentMetadata, links);
+
+        // unmodified sitemap:
+        // - publication date is far in the past, link should be skipped
+        // 2008-12-23
+        assertEquals("Outdated link not skipped", 0, links.size());
+
+        // now set the publication date to yesterday
+        LocalDateTime yesterday = LocalDateTime.now().minusDays(1);
+        content =
+                (new String(content, StandardCharsets.UTF_8))
+                        .replace(
+                                "2008-12-23",
+                                ""
+                                        + yesterday.format(
+                                                DateTimeFormatter.ofPattern("yyyy-MM-dd"))
+                                        + "")
+                        .getBytes(StandardCharsets.UTF_8);
+        ((NewsSiteMapParserBolt) bolt)
+                .parseSiteMap(url, content, contentType, parentMetadata, links);
+
+        assertEquals(
+                "Expected one  and one additional  link - image links are ignored",
+                2,
+                links.size());
     }
 
     protected byte[] readContent(String filename) throws IOException {
-	ByteArrayOutputStream baos = new ByteArrayOutputStream();
-	IOUtils.copy(getClass().getClassLoader().getResourceAsStream(filename), baos);
-	return baos.toByteArray();
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        IOUtils.copy(getClass().getClassLoader().getResourceAsStream(filename), baos);
+        return baos.toByteArray();
     }
 
-	@Test
-     public void testFeedWithSitemapNamespace() throws IOException, UnknownFormatException {
-         String url = "https://example.org/feed.xml";
-		byte[] content = readContent("feed-with-sitemap-namespace.xml");
-         SitemapType type = ((NewsSiteMapParserBolt) bolt).detectContent(url, content);
-         assertNotEquals("RSS feed with sitemap namespace should not be detected as sitemap",
-                 SitemapType.NEWS, type);
-         assertNotEquals("RSS feed with sitemap namespace should not be detected as sitemap",
-                 SitemapType.SITEMAP, type);
-     }
-
+    @Test
+    public void testFeedWithSitemapNamespace() throws IOException, UnknownFormatException {
+        String url = "https://example.org/feed.xml";
+        byte[] content = readContent("feed-with-sitemap-namespace.xml");
+        SitemapType type = ((NewsSiteMapParserBolt) bolt).detectContent(url, content);
+        assertNotEquals(
+                "RSS feed with sitemap namespace should not be detected as sitemap",
+                SitemapType.NEWS,
+                type);
+        assertNotEquals(
+                "RSS feed with sitemap namespace should not be detected as sitemap",
+                SitemapType.SITEMAP,
+                type);
+    }
 }