diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml
index dc52ef3..5c79a4c 100644
--- a/.github/workflows/maven.yml
+++ b/.github/workflows/maven.yml
@@ -63,5 +63,7 @@ jobs:
with:
distribution: adopt
java-version: ${{ matrix.java }}
+ - name: Check code formatting
+ run: mvn -B --no-transfer-progress com.cosium.code:git-code-format-maven-plugin:validate-code-format -Dskip.format.code=false
- name: Build with Maven
run: mvn -B --no-transfer-progress package --file pom.xml -DCI_ENV=true verify
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..37effdb
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,5 @@
+.idea
+target
+opensearchdata
+warcdata
+.java-version
\ No newline at end of file
diff --git a/.mvn/jvm.config b/.mvn/jvm.config
new file mode 100644
index 0000000..87ae20c
--- /dev/null
+++ b/.mvn/jvm.config
@@ -0,0 +1,8 @@
+--add-exports jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED
+--add-exports jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED
+--add-exports jdk.compiler/com.sun.tools.javac.main=ALL-UNNAMED
+--add-exports jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED
+--add-exports jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED
+--add-exports jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED
+--add-opens jdk.compiler/com.sun.tools.javac.code=ALL-UNNAMED
+--add-opens jdk.compiler/com.sun.tools.javac.comp=ALL-UNNAMED
diff --git a/README.md b/README.md
index 4efbf08..874da21 100644
--- a/README.md
+++ b/README.md
@@ -4,7 +4,7 @@ Crawler for news based on [StormCrawler](https://stormcrawler.apache.org/). Prod
## Prerequisites
-
+* JVM 17 or higher
* Install OpenSearch 2.19.5
* Install Apache Storm 2.8.8
* Start OpenSearch and Storm
@@ -119,3 +119,18 @@ NewsCrawl ACTIVE 48 1 146 NewsCrawl-1
$> storm kill NewsCrawl
```
+## Note for developers
+
+Please format your code before submitting a PR with
+
+```
+mvn git-code-format:format-code -Dgcf.globPattern="**/*" -Dskip.format.code=false
+```
+
+You can enable pre-commit format hooks by running:
+
+```
+mvn clean install -Dskip.format.code=false
+```
+
+
diff --git a/pom.xml b/pom.xml
index 0508210..69b7487 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,5 +1,4 @@
-
-
-
+4.0.0org.commoncrawl.stormcrawler.newscrawler3.6.0jar
+
+ https://github.com/commoncrawl/news-crawlApache License, Version 2.0
@@ -35,8 +36,6 @@ under the License.
- https://github.com/commoncrawl/news-crawl
-
UTF-83.6.0
@@ -47,89 +46,10 @@ under the License.
5.23.03.0.14.13.2
+ 5.4
+ true
-
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
- 3.15.0
-
- 17
- 17
-
-
-
- org.codehaus.mojo
- exec-maven-plugin
- 3.6.3
-
-
-
- exec
-
-
-
-
- java
- true
- false
- compile
-
-
-
- org.apache.maven.plugins
- maven-shade-plugin
- 3.6.2
-
-
- package
-
- shade
-
-
- false
-
-
-
- org.apache.storm.flux.Flux
-
-
-
-
-
-
-
-
-
- *:*
-
- META-INF/*.SF
- META-INF/*.DSA
- META-INF/*.RSA
-
-
-
-
- org.apache.storm:flux-core
-
- org/apache/commons/**
- org/apache/http/**
- org/yaml/**
-
-
-
-
-
-
-
-
-
-
org.apache.stormcrawler
@@ -207,4 +127,125 @@ under the License.
test
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.15.0
+
+ 17
+ 17
+
+
+
+ org.codehaus.mojo
+ exec-maven-plugin
+ 3.6.3
+
+ java
+ true
+ false
+ compile
+
+
+
+
+ exec
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.6.2
+
+
+
+ shade
+
+ package
+
+ false
+
+
+
+ org.apache.storm.flux.Flux
+
+
+
+
+
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+ org.apache.storm:flux-core
+
+ org/apache/commons/**
+ org/apache/http/**
+ org/yaml/**
+
+
+
+
+
+
+
+
+ com.cosium.code
+ git-code-format-maven-plugin
+ ${git-code-format-maven-plugin.version}
+
+
+
+ install-formatter-hook
+
+ install-hooks
+
+
+
+
+ validate-code-format
+
+ validate-code-format
+
+
+
+
+
+
+ com.cosium.code
+ google-java-format
+ ${git-code-format-maven-plugin.version}
+
+
+
+ ${skip.format.code}
+
+ true
+ false
+ false
+ false
+
+
+
+
+
diff --git a/src/main/java/org/commoncrawl/stormcrawler/filter/FastURLFilter.java b/src/main/java/org/commoncrawl/stormcrawler/filter/FastURLFilter.java
index 7c52716..9b2ee32 100644
--- a/src/main/java/org/commoncrawl/stormcrawler/filter/FastURLFilter.java
+++ b/src/main/java/org/commoncrawl/stormcrawler/filter/FastURLFilter.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -16,6 +16,16 @@
*/
package org.commoncrawl.stormcrawler.filter;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3Object;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.LinkedHashMultimap;
+import com.google.common.collect.Multimap;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.IOException;
@@ -29,7 +39,6 @@
import java.util.TimerTask;
import java.util.regex.Pattern;
import java.util.zip.GZIPInputStream;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.stormcrawler.JSONResource;
import org.apache.stormcrawler.Metadata;
@@ -38,28 +47,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3ClientBuilder;
-import com.amazonaws.services.s3.model.GetObjectRequest;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.S3Object;
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.databind.JsonMappingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.google.common.collect.LinkedHashMultimap;
-import com.google.common.collect.Multimap;
-
/**
- * Version of the FastURLFilter that can load from a text representation instead
- * of the JSON that the SC version handles. Can also reload periodically and get
- * its content from S3.
- *
- * Filters URLs based on a file of regular expressions using host/domains
- * matching first. The default policy is to accept a URL if no matches are
- * found.
+ * Version of the FastURLFilter that can load from a text representation instead of the JSON that
+ * the SC version handles. Can also reload periodically and get its content from S3.
+ *
+ *
Filters URLs based on a file of regular expressions using host/domains matching first. The
+ * default policy is to accept a URL if no matches are found.
+ *
+ *
- *
- * Host rules are evaluated before Domain rules. For
- * Host rules the entire host name of a URL must match while the
- * domain names in Domain rules are considered as matches if the
- * domain is a suffix of the host name (consisting of complete host name parts).
- * Shorter domain suffixes are checked first, a single dot
- * "." as "domain name" can be used to specify
- * global rules applied to every URL.
- *
- * E.g., for "www.example.com" the rules given above are looked up in the
- * following order:
+ *
+ * Host rules are evaluated before Domain rules. For Host
+ * rules the entire host name of a URL must match while the domain names in Domain
+ * rules are considered as matches if the domain is a suffix of the host name (consisting of
+ * complete host name parts). Shorter domain suffixes are checked first, a single dot ".
+ * " as "domain name" can be used to specify global rules applied to every
+ * URL.
+ *
+ *
E.g., for "www.example.com" the rules given above are looked up in the following order:
+ *
*
- *
check "www.example.com" whether host-based rules exist and whether one of
- * them matches
- *
check "www.example.com" for domain-based rules
- *
check "example.com" for domain-based rules
- *
check "com" for domain-based rules
- *
check for global rules ("Domain .")
+ *
check "www.example.com" whether host-based rules exist and whether one of them matches
+ *
check "www.example.com" for domain-based rules
+ *
check "example.com" for domain-based rules
+ *
check "com" for domain-based rules
+ *
check for global rules ("Domain .")
*
- * The first matching rule will reject the URL and no further rules are checked.
- * If no rule matches the URL is accepted. URLs without a host name (e.g.,
- * file:/path/file.txt are checked for global rules only. URLs
- * which fail to be parsed as {@link java.net.URL} are always rejected.
- *
- * For rules either the URL path (DenyPath) or path and query
- * (DenyPathQuery) are checked whether the given
- * {@link java.util.regex Java Regular expression} is found (see
- * {@link java.util.regex.Matcher#find()}) in the URL path (and query).
- *
- * Rules are applied in the order of their definition. For better performance,
- * regular expressions which are simpler/faster or match more URLs should be
- * defined earlier.
- *
- * Comments in the rule file start with the # character and reach
- * until the end of the line.
- *
- * The rules file is defined via the property urlfilter.fast.file,
- * the default name is fast-urlfilter.txt.
+ *
+ * The first matching rule will reject the URL and no further rules are checked. If no rule matches
+ * the URL is accepted. URLs without a host name (e.g., file:/path/file.txt are checked
+ * for global rules only. URLs which fail to be parsed as {@link java.net.URL} are always rejected.
+ *
+ *
For rules either the URL path (DenyPath) or path and query (DenyPathQuery
+ * ) are checked whether the given {@link java.util.regex Java Regular expression} is found
+ * (see {@link java.util.regex.Matcher#find()}) in the URL path (and query).
+ *
+ *
Rules are applied in the order of their definition. For better performance, regular
+ * expressions which are simpler/faster or match more URLs should be defined earlier.
+ *
+ *
Comments in the rule file start with the # character and reach until the end of
+ * the line.
+ *
+ *
The rules file is defined via the property urlfilter.fast.file, the default name
+ * is fast-urlfilter.txt.
*/
-public class FastURLFilter extends URLFilter implements JSONResource {
+public class FastURLFilter extends URLFilter implements JSONResource {
- protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ protected static final Logger LOG =
+ LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String URLFILTER_FAST_FILE = "urlfilter.fast.file";
private Multimap hostRules = LinkedHashMultimap.create();
@@ -121,302 +114,320 @@ public class FastURLFilter extends URLFilter implements JSONResource {
private String resourceFile;
- private static final Pattern CATCH_ALL_RULE = Pattern.compile("^\\s*DenyPath(?:Query)?\\s+\\.[*?]\\s*$");
+ private static final Pattern CATCH_ALL_RULE =
+ Pattern.compile("^\\s*DenyPath(?:Query)?\\s+\\.[*?]\\s*$");
private String resourceETAG;
public void configure(@SuppressWarnings("rawtypes") Map stormConf, JsonNode filterParams) {
- // read from conf first
- int refreshRate = ConfUtils.getInt(stormConf, "fast.urlfilter.refresh", -1);
- this.resourceFile = ConfUtils.getString(stormConf, "fast.urlfilter.file", null);
-
- // then from the param file (which needs recompiling in case of change)
- if (filterParams != null) {
- JsonNode node = filterParams.get("file");
- if (node != null && node.isTextual() && this.resourceFile == null) {
- this.resourceFile = node.asText();
- }
- node = filterParams.get("refresh");
- if (node != null && node.isInt() && refreshRate == -1) {
- refreshRate = node.asInt();
- }
- }
-
- try {
- loadJSONResources();
- } catch (Exception e) {
- LOG.error("Exception while loading resources", e);
- }
-
- if (refreshRate != -1) {
- LOG.info("Filter set to reload from {} every {} sec", getResourceFile(), refreshRate);
- new Timer().schedule(new TimerTask() {
- public void run() {
- LOG.info("Reloading resources");
- try {
- loadJSONResources();
- } catch (Exception e) {
- LOG.error("Can't load resources", e);
- }
- }
- }, refreshRate * 1000, refreshRate * 1000);
- }
+ // read from conf first
+ int refreshRate = ConfUtils.getInt(stormConf, "fast.urlfilter.refresh", -1);
+ this.resourceFile = ConfUtils.getString(stormConf, "fast.urlfilter.file", null);
+
+ // then from the param file (which needs recompiling in case of change)
+ if (filterParams != null) {
+ JsonNode node = filterParams.get("file");
+ if (node != null && node.isTextual() && this.resourceFile == null) {
+ this.resourceFile = node.asText();
+ }
+ node = filterParams.get("refresh");
+ if (node != null && node.isInt() && refreshRate == -1) {
+ refreshRate = node.asInt();
+ }
+ }
+
+ try {
+ loadJSONResources();
+ } catch (Exception e) {
+ LOG.error("Exception while loading resources", e);
+ }
+
+ if (refreshRate != -1) {
+ LOG.info("Filter set to reload from {} every {} sec", getResourceFile(), refreshRate);
+ new Timer()
+ .schedule(
+ new TimerTask() {
+ public void run() {
+ LOG.info("Reloading resources");
+ try {
+ loadJSONResources();
+ } catch (Exception e) {
+ LOG.error("Can't load resources", e);
+ }
+ }
+ },
+ refreshRate * 1000,
+ refreshRate * 1000);
+ }
}
/**
* Load the resources from the JSON file in the uber jar or from S3
- *
+ *
* @throws Exception
- **/
+ */
@Override
public void loadJSONResources() throws Exception {
- InputStream inputStream = null;
- AmazonS3 s3client = null;
- try {
- if (getResourceFile().startsWith("s3://")) {
- // try loading from S3
- s3client = AmazonS3ClientBuilder.standard().build();
- java.net.URI uri = new java.net.URI(getResourceFile());
-
- String bucketName = uri.getHost();
- // remove the first "/"
- String path = uri.getPath().substring(1);
-
- // optimisation - avoid a full reload if the resource has not changed
- ObjectMetadata metadata = s3client.getObjectMetadata(bucketName, path);
- final String ETAG = metadata.getETag();
- if (ETAG != null && ETAG.equals(resourceETAG)) {
- LOG.info("Unchanged ETAG for {} - skipping reload", getResourceFile());
- return;
- } else {
- resourceETAG = ETAG;
- }
-
- final S3Object object = s3client.getObject(new GetObjectRequest(bucketName, path));
- inputStream = object.getObjectContent();
- } else {
- inputStream = getClass().getClassLoader().getResourceAsStream(getResourceFile());
- if (inputStream == null) {
- LOG.error("Can't load conf from {}", getResourceFile());
- return;
- }
- }
- if (getResourceFile().endsWith(".gz")) {
- inputStream = new GZIPInputStream(inputStream);
- }
-
- loadJSONResources(new BufferedInputStream(inputStream));
- } finally {
- if (inputStream != null) {
- inputStream.close();
- }
- if (s3client != null) {
- s3client.shutdown();
- }
- }
+ InputStream inputStream = null;
+ AmazonS3 s3client = null;
+ try {
+ if (getResourceFile().startsWith("s3://")) {
+ // try loading from S3
+ s3client = AmazonS3ClientBuilder.standard().build();
+ java.net.URI uri = new java.net.URI(getResourceFile());
+
+ String bucketName = uri.getHost();
+ // remove the first "/"
+ String path = uri.getPath().substring(1);
+
+ // optimisation - avoid a full reload if the resource has not changed
+ ObjectMetadata metadata = s3client.getObjectMetadata(bucketName, path);
+ final String ETAG = metadata.getETag();
+ if (ETAG != null && ETAG.equals(resourceETAG)) {
+ LOG.info("Unchanged ETAG for {} - skipping reload", getResourceFile());
+ return;
+ } else {
+ resourceETAG = ETAG;
+ }
+
+ final S3Object object = s3client.getObject(new GetObjectRequest(bucketName, path));
+ inputStream = object.getObjectContent();
+ } else {
+ inputStream = getClass().getClassLoader().getResourceAsStream(getResourceFile());
+ if (inputStream == null) {
+ LOG.error("Can't load conf from {}", getResourceFile());
+ return;
+ }
+ }
+ if (getResourceFile().endsWith(".gz")) {
+ inputStream = new GZIPInputStream(inputStream);
+ }
+
+ loadJSONResources(new BufferedInputStream(inputStream));
+ } finally {
+ if (inputStream != null) {
+ inputStream.close();
+ }
+ if (s3client != null) {
+ s3client.shutdown();
+ }
+ }
}
@Override
public void loadJSONResources(InputStream inputStream)
- throws JsonParseException, JsonMappingException, IOException {
- long start = System.currentTimeMillis();
-
- try (Reader r = new InputStreamReader(inputStream)) {
- reloadRules(r);
- }
-
- long end = System.currentTimeMillis();
- LOG.info("Loaded {} hostrules and {} domain rules in {} msec from {}", hostRules.size(), domainRules.size(),
- (end - start), resourceFile);
+ throws JsonParseException, JsonMappingException, IOException {
+ long start = System.currentTimeMillis();
+
+ try (Reader r = new InputStreamReader(inputStream)) {
+ reloadRules(r);
+ }
+
+ long end = System.currentTimeMillis();
+ LOG.info(
+ "Loaded {} hostrules and {} domain rules in {} msec from {}",
+ hostRules.size(),
+ domainRules.size(),
+ (end - start),
+ resourceFile);
}
@Override
public String getResourceFile() {
- return resourceFile;
+ return resourceFile;
}
@Override
public String filter(URL sourceUrl, Metadata sourceMetadata, String urlToFilter) {
- synchronized (this) {
- URL u;
-
- try {
- u = new URL(urlToFilter);
- } catch (Exception e) {
- LOG.debug("Rejected {} because failed to parse as URL: {}", urlToFilter, e.getMessage());
- return null;
- }
-
- String hostname = u.getHost();
-
- // first check for host-specific rules
- for (Rule rule : hostRules.get(hostname)) {
- if (rule.match(u)) {
- return null;
- }
- }
-
- // also look up domain rules for host name
- for (Rule rule : domainRules.get(hostname)) {
- if (rule.match(u)) {
- return null;
- }
- }
-
- // check suffixes of host name from longer to shorter:
- // subdomains, domain, top-level domain
- int start = 0;
- int pos;
- while ((pos = hostname.indexOf('.', start)) != -1) {
- start = pos + 1;
- String domain = hostname.substring(start);
- for (Rule rule : domainRules.get(domain)) {
- if (rule.match(u)) {
- return null;
- }
- }
- }
-
- // finally check "global" rules defined for `Domain .`
- for (Rule rule : domainRules.get(".")) {
- if (rule.match(u)) {
- return null;
- }
- }
-
- // no reject rules found
- return urlToFilter;
- }
+ synchronized (this) {
+ URL u;
+
+ try {
+ u = new URL(urlToFilter);
+ } catch (Exception e) {
+ LOG.debug(
+ "Rejected {} because failed to parse as URL: {}",
+ urlToFilter,
+ e.getMessage());
+ return null;
+ }
+
+ String hostname = u.getHost();
+
+ // first check for host-specific rules
+ for (Rule rule : hostRules.get(hostname)) {
+ if (rule.match(u)) {
+ return null;
+ }
+ }
+
+ // also look up domain rules for host name
+ for (Rule rule : domainRules.get(hostname)) {
+ if (rule.match(u)) {
+ return null;
+ }
+ }
+
+ // check suffixes of host name from longer to shorter:
+ // subdomains, domain, top-level domain
+ int start = 0;
+ int pos;
+ while ((pos = hostname.indexOf('.', start)) != -1) {
+ start = pos + 1;
+ String domain = hostname.substring(start);
+ for (Rule rule : domainRules.get(domain)) {
+ if (rule.match(u)) {
+ return null;
+ }
+ }
+ }
+
+ // finally check "global" rules defined for `Domain .`
+ for (Rule rule : domainRules.get(".")) {
+ if (rule.match(u)) {
+ return null;
+ }
+ }
+
+ // no reject rules found
+ return urlToFilter;
+ }
}
private void reloadRules(Reader rules) throws IOException {
- synchronized (this) {
- domainRules.clear();
- hostRules.clear();
-
- BufferedReader reader = new BufferedReader(rules);
-
- String current = null;
- boolean host = false;
- int lineno = 0;
-
- String line;
- try {
- while ((line = reader.readLine()) != null) {
- lineno++;
- line = line.trim();
-
- if (line.indexOf("#") != -1) {
- // strip comments
- line = line.substring(0, line.indexOf("#")).trim();
- }
-
- if (StringUtils.isBlank(line)) {
- continue;
- }
-
- if (line.startsWith("Host")) {
- host = true;
- current = line.split("\\s+")[1];
- } else if (line.startsWith("Domain")) {
- host = false;
- current = line.split("\\s+")[1];
- } else {
- if (current == null) {
- continue;
- }
-
- Rule rule = null;
- try {
- if (CATCH_ALL_RULE.matcher(line).matches()) {
- rule = DenyAllRule.getInstance();
- } else if (line.startsWith("DenyPathQuery")) {
- rule = new DenyPathQueryRule(line.split("\\s+")[1]);
- } else if (line.startsWith("DenyPath")) {
- rule = new DenyPathRule(line.split("\\s+")[1]);
- } else {
- LOG.warn("Problem reading rule on line {}: {}", lineno, line);
- continue;
- }
- } catch (Exception e) {
- LOG.warn("Problem reading rule on line {}: {} - {}", lineno, line, e.getMessage());
- continue;
- }
-
- if (host) {
- LOG.trace("Adding host rule [{}] [{}]", current, rule);
- hostRules.put(current, rule);
- } else {
- LOG.trace("Adding domain rule [{}] [{}]", current, rule);
- domainRules.put(current, rule);
- }
- }
- }
-
- } catch (IOException e) {
- LOG.warn("Caught exception while reading rules file at line {}: {}", lineno, e.getMessage());
- throw e;
- }
- }
+ synchronized (this) {
+ domainRules.clear();
+ hostRules.clear();
+
+ BufferedReader reader = new BufferedReader(rules);
+
+ String current = null;
+ boolean host = false;
+ int lineno = 0;
+
+ String line;
+ try {
+ while ((line = reader.readLine()) != null) {
+ lineno++;
+ line = line.trim();
+
+ if (line.indexOf("#") != -1) {
+ // strip comments
+ line = line.substring(0, line.indexOf("#")).trim();
+ }
+
+ if (StringUtils.isBlank(line)) {
+ continue;
+ }
+
+ if (line.startsWith("Host")) {
+ host = true;
+ current = line.split("\\s+")[1];
+ } else if (line.startsWith("Domain")) {
+ host = false;
+ current = line.split("\\s+")[1];
+ } else {
+ if (current == null) {
+ continue;
+ }
+
+ Rule rule = null;
+ try {
+ if (CATCH_ALL_RULE.matcher(line).matches()) {
+ rule = DenyAllRule.getInstance();
+ } else if (line.startsWith("DenyPathQuery")) {
+ rule = new DenyPathQueryRule(line.split("\\s+")[1]);
+ } else if (line.startsWith("DenyPath")) {
+ rule = new DenyPathRule(line.split("\\s+")[1]);
+ } else {
+ LOG.warn("Problem reading rule on line {}: {}", lineno, line);
+ continue;
+ }
+ } catch (Exception e) {
+ LOG.warn(
+ "Problem reading rule on line {}: {} - {}",
+ lineno,
+ line,
+ e.getMessage());
+ continue;
+ }
+
+ if (host) {
+ LOG.trace("Adding host rule [{}] [{}]", current, rule);
+ hostRules.put(current, rule);
+ } else {
+ LOG.trace("Adding domain rule [{}] [{}]", current, rule);
+ domainRules.put(current, rule);
+ }
+ }
+ }
+
+ } catch (IOException e) {
+ LOG.warn(
+ "Caught exception while reading rules file at line {}: {}",
+ lineno,
+ e.getMessage());
+ throw e;
+ }
+ }
}
public static class Rule {
- protected Pattern pattern;
+ protected Pattern pattern;
- Rule() {
- }
+ Rule() {}
- public Rule(String regex) {
- pattern = Pattern.compile(regex);
- }
+ public Rule(String regex) {
+ pattern = Pattern.compile(regex);
+ }
- public boolean match(URL url) {
- return pattern.matcher(url.toString()).find();
- }
+ public boolean match(URL url) {
+ return pattern.matcher(url.toString()).find();
+ }
- public String toString() {
- return pattern.toString();
- }
+ public String toString() {
+ return pattern.toString();
+ }
}
public static class DenyPathRule extends Rule {
- public DenyPathRule(String regex) {
- super(regex);
- }
-
- public boolean match(URL url) {
- String haystack = url.getPath();
- return pattern.matcher(haystack).find();
- }
+ public DenyPathRule(String regex) {
+ super(regex);
+ }
+
+ public boolean match(URL url) {
+ String haystack = url.getPath();
+ return pattern.matcher(haystack).find();
+ }
}
/** Rule for DenyPath .* or DenyPath .? */
public static class DenyAllRule extends Rule {
- private static Rule instance = new DenyAllRule(".");
+ private static Rule instance = new DenyAllRule(".");
- private DenyAllRule(String regex) {
- super(regex);
- }
+ private DenyAllRule(String regex) {
+ super(regex);
+ }
- public static Rule getInstance() {
- return instance;
- }
+ public static Rule getInstance() {
+ return instance;
+ }
- public boolean match(URL url) {
- return true;
- }
+ public boolean match(URL url) {
+ return true;
+ }
}
public static class DenyPathQueryRule extends Rule {
- public DenyPathQueryRule(String regex) {
- super(regex);
- }
-
- public boolean match(URL url) {
- String haystack = url.getFile();
- return pattern.matcher(haystack).find();
- }
+ public DenyPathQueryRule(String regex) {
+ super(regex);
+ }
+
+ public boolean match(URL url) {
+ String haystack = url.getFile();
+ return pattern.matcher(haystack).find();
+ }
}
}
diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/ContentDetector.java b/src/main/java/org/commoncrawl/stormcrawler/news/ContentDetector.java
index 1c3a4a5..9979781 100644
--- a/src/main/java/org/commoncrawl/stormcrawler/news/ContentDetector.java
+++ b/src/main/java/org/commoncrawl/stormcrawler/news/ContentDetector.java
@@ -13,33 +13,28 @@
*/
package org.commoncrawl.stormcrawler.news;
+import com.google.common.primitives.Bytes;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
-import com.google.common.primitives.Bytes;
-
public class ContentDetector {
protected byte[][][] clues;
protected int maxOffset;
/**
- * Set up detector to detect content sniffing for a set of clue strings in a
- * prefix of the binary content.
+ * Set up detector to detect content sniffing for a set of clue strings in a prefix of the
+ * binary content.
*
- * @param clues
- * nested list of literal clues. Outer list defines an OR-group,
- * inner list contained ANDed clues required to match all, e.g.
- * the following definition would match if either
- * "clue1" and "and_clue2" are matched, or
- * alternatively "or_clue3" is found
+ * @param clues nested list of literal clues. Outer list defines an OR-group, inner list
+ * contained ANDed clues required to match all, e.g. the following definition would match if
+ * either "clue1" and "and_clue2" are matched, or alternatively
+ * "or_clue3" is found
+ *
+ * { { clue1, and_clue2 }, { or_clue3 } }
+ *
*
- *
- * { { clue1, and_clue2 }, { or_clue3 } }
- *
- *
- * @param maxOffset
- * max. offset of content prefix checked for clues
+ * @param maxOffset max. offset of content prefix checked for clues
*/
public ContentDetector(String[][] clues, int maxOffset) {
this.maxOffset = maxOffset;
@@ -60,8 +55,7 @@ public int getFirstMatch(byte[] content) {
for (int i = 0; i < clues.length; i++) {
byte[][] group = clues[i];
for (byte[] clue : group) {
- if (Bytes.indexOf(beginning, clue) == -1)
- continue OR;
+ if (Bytes.indexOf(beginning, clue) == -1) continue OR;
}
// success, all members of one group matched
return i;
@@ -72,5 +66,4 @@ public int getFirstMatch(byte[] content) {
public boolean matches(byte[] content) {
return (getFirstMatch(content) >= 0);
}
-
}
diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java b/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java
index 3a3c018..cc2d85a 100644
--- a/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java
+++ b/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to DigitalPebble Ltd under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,13 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.commoncrawl.stormcrawler.news;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
-
import org.apache.storm.topology.BoltDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
@@ -43,116 +41,126 @@
import org.apache.stormcrawler.warc.WARCHdfsBolt;
import org.slf4j.LoggerFactory;
-/**
- * Dummy topology to play with the spouts and bolts on OpenSearch
- */
+/** Dummy topology to play with the spouts and bolts on OpenSearch */
public class CrawlTopology extends ConfigurableTopology {
private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(CrawlTopology.class);
public static void main(String[] args) throws Exception {
- ConfigurableTopology.start(new CrawlTopology(), args);
+ ConfigurableTopology.start(new CrawlTopology(), args);
}
@Override
protected int run(String[] args) {
- TopologyBuilder builder = new TopologyBuilder();
+ TopologyBuilder builder = new TopologyBuilder();
- int numWorkers = ConfUtils.getInt(getConf(), "topology.workers", 1);
+ int numWorkers = ConfUtils.getInt(getConf(), "topology.workers", 1);
- // set to the real number of shards ONLY if es.status.routing is set to
- // true in the configuration
- int numShards = 16;
+ // set to the real number of shards ONLY if es.status.routing is set to
+ // true in the configuration
+ int numShards = 16;
- if (args.length >= 2) {
- // arguments include seed directory and file pattern
- LOG.info("Injecting seeds from {} by pattern {}", args[0], args[1]);
- builder.setSpout("filespout", new FileSpout(args[0], args[1], true));
- Fields key = new Fields("url");
+ if (args.length >= 2) {
+ // arguments include seed directory and file pattern
+ LOG.info("Injecting seeds from {} by pattern {}", args[0], args[1]);
+ builder.setSpout("filespout", new FileSpout(args[0], args[1], true));
+ Fields key = new Fields("url");
- builder.setBolt("filter", new URLFilterBolt()).fieldsGrouping("filespout", Constants.StatusStreamName, key);
- }
+ builder.setBolt("filter", new URLFilterBolt())
+ .fieldsGrouping("filespout", Constants.StatusStreamName, key);
+ }
- builder.setSpout("spout", new AggregationSpout(), numShards);
+ builder.setSpout("spout", new AggregationSpout(), numShards);
- builder.setBolt("prefilter", new PreFilterBolt("pre-urlfilters.json"), numWorkers).shuffleGrouping("spout");
+ builder.setBolt("prefilter", new PreFilterBolt("pre-urlfilters.json"), numWorkers)
+ .shuffleGrouping("spout");
- builder.setBolt("partitioner", new URLPartitionerBolt(), numWorkers).shuffleGrouping("prefilter");
+ builder.setBolt("partitioner", new URLPartitionerBolt(), numWorkers)
+ .shuffleGrouping("prefilter");
- builder.setBolt("fetch", new FetcherBolt(), numWorkers).fieldsGrouping("partitioner", new Fields("key"));
+ builder.setBolt("fetch", new FetcherBolt(), numWorkers)
+ .fieldsGrouping("partitioner", new Fields("key"));
- builder.setBolt("sitemap", new NewsSiteMapParserBolt(), numWorkers).setNumTasks(2)
- .localOrShuffleGrouping("fetch");
+ builder.setBolt("sitemap", new NewsSiteMapParserBolt(), numWorkers)
+ .setNumTasks(2)
+ .localOrShuffleGrouping("fetch");
- builder.setBolt("feed", new FeedParserBolt(), numWorkers).setNumTasks(4).localOrShuffleGrouping("sitemap");
+ builder.setBolt("feed", new FeedParserBolt(), numWorkers)
+ .setNumTasks(4)
+ .localOrShuffleGrouping("sitemap");
- // don't need to parse the pages but need to update their status
- builder.setBolt("ssb", new DummyIndexer(), numWorkers).localOrShuffleGrouping("feed");
+ // don't need to parse the pages but need to update their status
+ builder.setBolt("ssb", new DummyIndexer(), numWorkers).localOrShuffleGrouping("feed");
- WARCHdfsBolt warcbolt = getWarcBolt("CC-NEWS");
+ WARCHdfsBolt warcbolt = getWarcBolt("CC-NEWS");
- // take it from feed default output so that the feed files themselves
- // don't get included - unless we want them too of course!
- builder.setBolt("warc", warcbolt, numWorkers).localOrShuffleGrouping("feed");
-
- final Fields furl = new Fields("url");
+ // take it from feed default output so that the feed files themselves
+ // don't get included - unless we want them too of course!
+ builder.setBolt("warc", warcbolt, numWorkers).localOrShuffleGrouping("feed");
- BoltDeclarer statusBolt = builder.setBolt("status", new StatusUpdaterBolt(), numWorkers)
- .fieldsGrouping("fetch", Constants.StatusStreamName, furl)
- .fieldsGrouping("sitemap", Constants.StatusStreamName, furl)
- .fieldsGrouping("feed", Constants.StatusStreamName, furl)
- .fieldsGrouping("ssb", Constants.StatusStreamName, furl)
- .fieldsGrouping("prefilter", Constants.StatusStreamName, furl);
-
- if (args.length >= 2) {
- statusBolt.customGrouping("filter", Constants.StatusStreamName, new URLStreamGrouping());
- }
- statusBolt.setNumTasks(numShards);
+ final Fields furl = new Fields("url");
- return submit(conf, builder);
+ BoltDeclarer statusBolt =
+ builder.setBolt("status", new StatusUpdaterBolt(), numWorkers)
+ .fieldsGrouping("fetch", Constants.StatusStreamName, furl)
+ .fieldsGrouping("sitemap", Constants.StatusStreamName, furl)
+ .fieldsGrouping("feed", Constants.StatusStreamName, furl)
+ .fieldsGrouping("ssb", Constants.StatusStreamName, furl)
+ .fieldsGrouping("prefilter", Constants.StatusStreamName, furl);
+
+ if (args.length >= 2) {
+ statusBolt.customGrouping(
+ "filter", Constants.StatusStreamName, new URLStreamGrouping());
+ }
+ statusBolt.setNumTasks(numShards);
+
+ return submit(conf, builder);
}
protected WARCHdfsBolt getWarcBolt(String filePrefix) {
- // path is absolute
- String warcFilePath = ConfUtils.getString(getConf(), "warc.dir", "/data/warc");
-
- WARCFileNameFormat fileNameFormat = new WARCFileNameFormat();
- fileNameFormat.withPath(warcFilePath);
- fileNameFormat.withPrefix(filePrefix);
-
- Map fields = new LinkedHashMap<>();
- fields.put("software", "StormCrawler 2.10 https://stormcrawler.net/");
- fields.put("description", "News crawl for Common Crawl");
- String userAgent = AbstractHttpProtocol.getAgentString(getConf());
- fields.put("http-header-user-agent", userAgent);
- fields.put("http-header-from", ConfUtils.getString(getConf(), "http.agent.email"));
- String robotsTxtParser = "checked by crawler-commons " + crawlercommons.CrawlerCommons.getVersion()
- + " (https://github.com/crawler-commons/crawler-commons)";
- fields.put("robots", robotsTxtParser);
- fields.put("format", "WARC File Format 1.1");
- fields.put("conformsTo", "https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.1/");
-
- WARCHdfsBolt warcbolt = (WARCHdfsBolt) new WARCHdfsBolt();
- warcbolt.withConfigKey("warc");
- warcbolt.withFileNameFormat(fileNameFormat);
- warcbolt.withHeader(fields);
- warcbolt.withRequestRecords();
-
- // use RawLocalFileSystem (instead of ChecksumFileSystem) to avoid that
- // WARC files are truncated if the topology is stopped because of a
- // delayed sync of the default ChecksumFileSystem
- Map hdfsConf = new HashMap<>();
- hdfsConf.put("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem");
- getConf().put("warc", hdfsConf);
-
- // will rotate if reaches size or time limit
- int maxMB = ConfUtils.getInt(getConf(), "warc.rotation.policy.max-mb", 1024);
- int maxMinutes = ConfUtils.getInt(getConf(), "warc.rotation.policy.max-minutes", 1440);
- FileTimeSizeRotationPolicy rotpol = new FileTimeSizeRotationPolicy(maxMB, Units.MB);
- rotpol.setTimeRotationInterval(maxMinutes, FileTimeSizeRotationPolicy.TimeUnit.MINUTES);
- warcbolt.withRotationPolicy(rotpol);
-
- return warcbolt;
+ // path is absolute
+ String warcFilePath = ConfUtils.getString(getConf(), "warc.dir", "/data/warc");
+
+ WARCFileNameFormat fileNameFormat = new WARCFileNameFormat();
+ fileNameFormat.withPath(warcFilePath);
+ fileNameFormat.withPrefix(filePrefix);
+
+ Map fields = new LinkedHashMap<>();
+ fields.put("software", "StormCrawler 2.10 https://stormcrawler.net/");
+ fields.put("description", "News crawl for Common Crawl");
+ String userAgent = AbstractHttpProtocol.getAgentString(getConf());
+ fields.put("http-header-user-agent", userAgent);
+ fields.put("http-header-from", ConfUtils.getString(getConf(), "http.agent.email"));
+ String robotsTxtParser =
+ "checked by crawler-commons "
+ + crawlercommons.CrawlerCommons.getVersion()
+ + " (https://github.com/crawler-commons/crawler-commons)";
+ fields.put("robots", robotsTxtParser);
+ fields.put("format", "WARC File Format 1.1");
+ fields.put(
+ "conformsTo",
+ "https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.1/");
+
+ WARCHdfsBolt warcbolt = (WARCHdfsBolt) new WARCHdfsBolt();
+ warcbolt.withConfigKey("warc");
+ warcbolt.withFileNameFormat(fileNameFormat);
+ warcbolt.withHeader(fields);
+ warcbolt.withRequestRecords();
+
+ // use RawLocalFileSystem (instead of ChecksumFileSystem) to avoid that
+ // WARC files are truncated if the topology is stopped because of a
+ // delayed sync of the default ChecksumFileSystem
+ Map hdfsConf = new HashMap<>();
+ hdfsConf.put("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem");
+ getConf().put("warc", hdfsConf);
+
+ // will rotate if reaches size or time limit
+ int maxMB = ConfUtils.getInt(getConf(), "warc.rotation.policy.max-mb", 1024);
+ int maxMinutes = ConfUtils.getInt(getConf(), "warc.rotation.policy.max-minutes", 1440);
+ FileTimeSizeRotationPolicy rotpol = new FileTimeSizeRotationPolicy(maxMB, Units.MB);
+ rotpol.setTimeRotationInterval(maxMinutes, FileTimeSizeRotationPolicy.TimeUnit.MINUTES);
+ warcbolt.withRotationPolicy(rotpol);
+
+ return warcbolt;
}
-
}
diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/FeedDetectorBolt.java b/src/main/java/org/commoncrawl/stormcrawler/news/FeedDetectorBolt.java
index c365525..e607dc2 100644
--- a/src/main/java/org/commoncrawl/stormcrawler/news/FeedDetectorBolt.java
+++ b/src/main/java/org/commoncrawl/stormcrawler/news/FeedDetectorBolt.java
@@ -14,13 +14,11 @@
package org.commoncrawl.stormcrawler.news;
import java.util.Map;
-
+import org.apache.http.HttpHeaders;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
-import org.slf4j.LoggerFactory;
-
import org.apache.stormcrawler.Constants;
import org.apache.stormcrawler.Metadata;
import org.apache.stormcrawler.bolt.FeedParserBolt;
@@ -29,28 +27,23 @@
import org.apache.stormcrawler.parse.ParseFilters;
import org.apache.stormcrawler.parse.ParseResult;
import org.apache.stormcrawler.persistence.Status;
-import org.apache.http.HttpHeaders;
+import org.slf4j.LoggerFactory;
/** Detect RSS and Atom feeds, but do not parse and extract links */
@SuppressWarnings("serial")
public class FeedDetectorBolt extends FeedParserBolt {
- private static final org.slf4j.Logger LOG = LoggerFactory
- .getLogger(FeedDetectorBolt.class);
+ private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(FeedDetectorBolt.class);
- public static final String[] mimeTypeClues = {
- "rss+xml", "atom+xml", "text/rss"
- };
+ public static final String[] mimeTypeClues = {"rss+xml", "atom+xml", "text/rss"};
- public static String[][] contentClues = { { "<{}> for {}",
- ct, url);
+ LOG.info("Feed detected from content type <{}> for {}", ct, url);
break;
}
}
@@ -90,8 +82,8 @@ public void execute(Tuple tuple) {
parseData.setMetadata(metadata);
parseFilters.filter(url, content, null, parse);
// emit status
- collector.emit(Constants.StatusStreamName, tuple,
- new Values(url, metadata, Status.FETCHED));
+ collector.emit(
+ Constants.StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED));
} else {
// pass on
collector.emit(tuple, tuple.getValues());
@@ -100,11 +92,9 @@ public void execute(Tuple tuple) {
}
@Override
- @SuppressWarnings({ "rawtypes" })
- public void prepare(Map stormConf, TopologyContext context,
- OutputCollector collect) {
+ @SuppressWarnings({"rawtypes"})
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collect) {
super.prepare(stormConf, context, collect);
parseFilters = ParseFilters.fromConf(stormConf);
}
-
}
diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/NewsSiteMapParserBolt.java b/src/main/java/org/commoncrawl/stormcrawler/news/NewsSiteMapParserBolt.java
index 3c4cf55..187d086 100644
--- a/src/main/java/org/commoncrawl/stormcrawler/news/NewsSiteMapParserBolt.java
+++ b/src/main/java/org/commoncrawl/stormcrawler/news/NewsSiteMapParserBolt.java
@@ -15,6 +15,18 @@
import static org.apache.stormcrawler.Constants.StatusStreamName;
+import crawlercommons.sitemaps.AbstractSiteMap;
+import crawlercommons.sitemaps.Namespace;
+import crawlercommons.sitemaps.SiteMap;
+import crawlercommons.sitemaps.SiteMapIndex;
+import crawlercommons.sitemaps.SiteMapParser;
+import crawlercommons.sitemaps.SiteMapURL;
+import crawlercommons.sitemaps.SiteMapURL.ChangeFrequency;
+import crawlercommons.sitemaps.UnknownFormatException;
+import crawlercommons.sitemaps.extension.Extension;
+import crawlercommons.sitemaps.extension.ExtensionMetadata;
+import crawlercommons.sitemaps.extension.LinkAttributes;
+import crawlercommons.sitemaps.extension.NewsAttributes;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
@@ -24,7 +36,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHeaders;
import org.apache.storm.metric.api.MeanReducer;
@@ -46,56 +57,45 @@
import org.apache.stormcrawler.util.ConfUtils;
import org.slf4j.LoggerFactory;
-import crawlercommons.sitemaps.AbstractSiteMap;
-import crawlercommons.sitemaps.Namespace;
-import crawlercommons.sitemaps.SiteMap;
-import crawlercommons.sitemaps.SiteMapIndex;
-import crawlercommons.sitemaps.SiteMapParser;
-import crawlercommons.sitemaps.SiteMapURL;
-import crawlercommons.sitemaps.SiteMapURL.ChangeFrequency;
-import crawlercommons.sitemaps.UnknownFormatException;
-import crawlercommons.sitemaps.extension.Extension;
-import crawlercommons.sitemaps.extension.ExtensionMetadata;
-import crawlercommons.sitemaps.extension.LinkAttributes;
-import crawlercommons.sitemaps.extension.NewsAttributes;
-
-
/**
- * ParserBolt for news
+ * ParserBolt for news
* sitemaps.
*/
@SuppressWarnings("serial")
public class NewsSiteMapParserBolt extends SiteMapParserBolt {
// TODO:
- // this is a modified copy of c.d.s.bolt.SiteMapParserBolt
- // - make parent class extensible and overridable
- // modifications:
- // - detect and process only Google news sitemaps
- // - or a sitemapindex because some subsitemaps may
- // be news sitemaps
- // - pass "isSitemapNews" to status metadata
+ // this is a modified copy of c.d.s.bolt.SiteMapParserBolt
+ // - make parent class extensible and overridable
+ // modifications:
+ // - detect and process only Google news sitemaps
+ // - or a sitemapindex because some subsitemaps may
+ // be news sitemaps
+ // - pass "isSitemapNews" to status metadata
public static enum SitemapType {
- NEWS, INDEX, SITEMAP
+ NEWS,
+ INDEX,
+ SITEMAP
}
public static final String isSitemapNewsKey = "isSitemapNews";
public static final String isSitemapIndexKey = "isSitemapIndex";
+
/**
- * A sitemap (not necessarily a news sitemap) which is verified to contain
- * links to news articles. Necessary to crawl news sites which provide a
- * sitemap but neither a news feed or sitemap.
+ * A sitemap (not necessarily a news sitemap) which is verified to contain links to news
+ * articles. Necessary to crawl news sites which provide a sitemap but neither a news feed or
+ * sitemap.
*/
public static final String isSitemapVerifiedKey = "isSitemapVerified";
- private static final org.slf4j.Logger LOG = LoggerFactory
- .getLogger(NewsSiteMapParserBolt.class);
+ private static final org.slf4j.Logger LOG =
+ LoggerFactory.getLogger(NewsSiteMapParserBolt.class);
/* content clues for news sitemaps, sitemap indexes or any sitemaps */
public static String[][] contentClues;
public static int contentCluesSitemapNewsMatchUpTo = -1;
public static int contentCluesSitemapIndexMatchUpTo = -1;
+
static {
int cluesSize = Namespace.NEWS.length + 1 + 1 + Namespace.SITEMAP_LEGACY.length;
contentClues = new String[cluesSize][1];
@@ -129,7 +129,7 @@ public static enum SitemapType {
private ReducedMetric averagedMetrics;
- /** Delay in minutes used for scheduling sub-sitemaps **/
+ /** Delay in minutes used for scheduling sub-sitemaps * */
private int scheduleSitemapsWithDelay = -1;
@Override
@@ -140,14 +140,10 @@ public void execute(Tuple tuple) {
byte[] content = tuple.getBinaryByField("content");
String url = tuple.getStringByField("url");
- boolean isSitemap = Boolean.valueOf(
- metadata.getFirstValue(SiteMapParserBolt.isSitemapKey));
- boolean isNewsSitemap = Boolean
- .valueOf(metadata.getFirstValue(isSitemapNewsKey));
- boolean isSitemapIndex = Boolean
- .valueOf(metadata.getFirstValue(isSitemapIndexKey));
- boolean isSitemapVerified = Boolean
- .valueOf(metadata.getFirstValue(isSitemapVerifiedKey));
+ boolean isSitemap = Boolean.valueOf(metadata.getFirstValue(SiteMapParserBolt.isSitemapKey));
+ boolean isNewsSitemap = Boolean.valueOf(metadata.getFirstValue(isSitemapNewsKey));
+ boolean isSitemapIndex = Boolean.valueOf(metadata.getFirstValue(isSitemapIndexKey));
+ boolean isSitemapVerified = Boolean.valueOf(metadata.getFirstValue(isSitemapVerifiedKey));
if (sniffContent) {
SitemapType type = detectContent(url, content);
@@ -183,14 +179,16 @@ public void execute(Tuple tuple) {
if (isNewsSitemap || isSitemapIndex || isSitemapVerified) {
/*
- * remove the isSitemap key from metadata to avoid that the default
- * sitemap fetch interval is applied to news sitemaps, sitemap
- * indexes and verified sitemaps
+ * remove the isSitemap key from metadata to avoid that the default sitemap
+ * fetch interval is applied to news sitemaps, sitemap indexes and verified
+ * sitemaps
*/
metadata.remove(isSitemapKey);
} else {
if (isSitemap) {
- collector.emit(Constants.StatusStreamName, tuple,
+ collector.emit(
+ Constants.StatusStreamName,
+ tuple,
new Values(url, metadata, Status.FETCHED));
} else {
// not a sitemap, just pass it on
@@ -217,8 +215,8 @@ public void execute(Tuple tuple) {
metadata.setValue(Constants.STATUS_ERROR_SOURCE, "sitemap parsing");
metadata.setValue(Constants.STATUS_ERROR_MESSAGE, errorMessage);
metadata.remove("numLinks");
- collector.emit(Constants.StatusStreamName, tuple, new Values(url,
- metadata, Status.ERROR));
+ collector.emit(
+ Constants.StatusStreamName, tuple, new Values(url, metadata, Status.ERROR));
collector.ack(tuple);
return;
}
@@ -232,15 +230,12 @@ public void execute(Tuple tuple) {
parseFilters.filter(url, content, null, parse);
} catch (RuntimeException e) {
- String errorMessage = "Exception while running parse filters on "
- + url + ": " + e;
+ String errorMessage = "Exception while running parse filters on " + url + ": " + e;
LOG.error(errorMessage);
- metadata.setValue(Constants.STATUS_ERROR_SOURCE,
- "content filtering");
+ metadata.setValue(Constants.STATUS_ERROR_SOURCE, "content filtering");
metadata.setValue(Constants.STATUS_ERROR_MESSAGE, errorMessage);
metadata.remove("numLinks");
- collector.emit(StatusStreamName, tuple, new Values(url, metadata,
- Status.ERROR));
+ collector.emit(StatusStreamName, tuple, new Values(url, metadata, Status.ERROR));
collector.ack(tuple);
return;
}
@@ -263,8 +258,7 @@ public void execute(Tuple tuple) {
ol.getMetadata().setValue(isSitemapVerifiedKey, "true");
}
}
- Values v = new Values(ol.getTargetURL(), ol.getMetadata(),
- Status.DISCOVERED);
+ Values v = new Values(ol.getTargetURL(), ol.getMetadata(), Status.DISCOVERED);
collector.emit(Constants.StatusStreamName, tuple, v);
}
@@ -272,8 +266,8 @@ public void execute(Tuple tuple) {
metadata.setValue("numLinks", String.valueOf(outlinks.size()));
// marking the main URL as successfully fetched
- collector.emit(Constants.StatusStreamName, tuple, new Values(url,
- metadata, Status.FETCHED));
+ collector.emit(
+ Constants.StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED));
collector.ack(tuple);
}
@@ -291,12 +285,10 @@ public SitemapType detectContent(String url, byte[] content) {
if (match >= 0) {
// a sitemap, need to detect type of sitemap
if (match <= contentCluesSitemapNewsMatchUpTo) {
- LOG.info("{} detected as news sitemap based on content",
- url);
+ LOG.info("{} detected as news sitemap based on content", url);
return SitemapType.NEWS;
} else if (match <= contentCluesSitemapIndexMatchUpTo) {
- LOG.info("{} detected as sitemap index based on content",
- url);
+ LOG.info("{} detected as sitemap index based on content", url);
return SitemapType.INDEX;
} else {
return SitemapType.SITEMAP;
@@ -317,12 +309,15 @@ private boolean recentlyModified(Date lastModified) {
return true;
}
- protected AbstractSiteMap parseSiteMap(String url, byte[] content,
- String contentType, Metadata parentMetadata, List links)
+ protected AbstractSiteMap parseSiteMap(
+ String url,
+ byte[] content,
+ String contentType,
+ Metadata parentMetadata,
+ List links)
throws UnknownFormatException, IOException {
- SiteMapParser parser = new SiteMapParser(strictModeSitemaps,
- allowPartialSitemaps);
+ SiteMapParser parser = new SiteMapParser(strictModeSitemaps, allowPartialSitemaps);
parser.setStrictNamespace(true);
parser.addAcceptedNamespace(Namespace.SITEMAP_LEGACY);
parser.addAcceptedNamespace(Namespace.EMPTY);
@@ -334,8 +329,7 @@ protected AbstractSiteMap parseSiteMap(String url, byte[] content,
long start = System.currentTimeMillis();
AbstractSiteMap siteMap;
// let the parser guess what the mimetype is
- if (StringUtils.isBlank(contentType)
- || contentType.contains("octet-stream")) {
+ if (StringUtils.isBlank(contentType) || contentType.contains("octet-stream")) {
siteMap = parser.parseSiteMap(content, sURL);
} else {
siteMap = parser.parseSiteMap(contentType, content, sURL);
@@ -351,8 +345,8 @@ protected AbstractSiteMap parseSiteMap(String url, byte[] content,
Collection subsitemaps = smi.getSitemaps();
int delay = 0;
/*
- * keep the subsitemaps as outlinks they will be fetched and parsed
- * in the following steps
+ * keep the subsitemaps as outlinks they will be fetched and parsed in the
+ * following steps
*/
Iterator iter = subsitemaps.iterator();
while (iter.hasNext()) {
@@ -365,13 +359,21 @@ protected AbstractSiteMap parseSiteMap(String url, byte[] content,
linksSkippedNotRecentlyModified++;
LOG.debug(
"{} has a modified date {} which is more than {} hours old",
- target, lastModified.toString(),
+ target,
+ lastModified.toString(),
filterHoursSinceModified);
continue;
}
- Outlink ol = filterOutlink(sURL, target, parentMetadata,
- isSitemapKey, "true", isSitemapNewsKey, "false");
+ Outlink ol =
+ filterOutlink(
+ sURL,
+ target,
+ parentMetadata,
+ isSitemapKey,
+ "true",
+ isSitemapNewsKey,
+ "false");
if (ol == null) {
continue;
}
@@ -379,9 +381,8 @@ protected AbstractSiteMap parseSiteMap(String url, byte[] content,
// add a delay
if (this.scheduleSitemapsWithDelay > 0) {
if (delay > 0) {
- ol.getMetadata().setValue(
- DefaultScheduler.DELAY_METADATA,
- Integer.toString(delay));
+ ol.getMetadata()
+ .setValue(DefaultScheduler.DELAY_METADATA, Integer.toString(delay));
}
delay += this.scheduleSitemapsWithDelay;
}
@@ -389,15 +390,19 @@ protected AbstractSiteMap parseSiteMap(String url, byte[] content,
links.add(ol);
LOG.debug("{} : [sitemap] {}", url, target);
}
- LOG.info("Sitemap index (found {} sitemaps, {} skipped): {}",
- linksFound, linksSkippedNotRecentlyModified, url);
+ LOG.info(
+ "Sitemap index (found {} sitemaps, {} skipped): {}",
+ linksFound,
+ linksSkippedNotRecentlyModified,
+ url);
}
// sitemap files
else {
SiteMap sm = (SiteMap) siteMap;
Collection sitemapURLs = sm.getSiteMapUrls();
Iterator iter = sitemapURLs.iterator();
- sitemap_urls: while (iter.hasNext()) {
+ sitemap_urls:
+ while (iter.hasNext()) {
linksFound++;
SiteMapURL smurl = iter.next();
// TODO handle priority in metadata
@@ -414,11 +419,12 @@ protected AbstractSiteMap parseSiteMap(String url, byte[] content,
linksSkippedNotRecentlyModified++;
LOG.debug(
"{} has a modified date {} which is more than {} hours old",
- target, lastModified, filterHoursSinceModified);
+ target,
+ lastModified,
+ filterHoursSinceModified);
continue;
}
- ExtensionMetadata[] newsAttrs = smurl
- .getAttributesForExtension(Extension.NEWS);
+ ExtensionMetadata[] newsAttrs = smurl.getAttributesForExtension(Extension.NEWS);
if (newsAttrs != null) {
// filter based on news publication date
// 2008-12-23
@@ -429,7 +435,9 @@ protected AbstractSiteMap parseSiteMap(String url, byte[] content,
linksSkippedNotRecentlyModified++;
LOG.debug(
"{} has a news publication date {} which is more than {} hours old",
- target, pubDate, filterHoursSinceModified);
+ target,
+ pubDate,
+ filterHoursSinceModified);
continue sitemap_urls;
}
}
@@ -437,8 +445,7 @@ protected AbstractSiteMap parseSiteMap(String url, byte[] content,
}
// add alternative language links
- ExtensionMetadata[] linkAttrs = smurl
- .getAttributesForExtension(Extension.LINKS);
+ ExtensionMetadata[] linkAttrs = smurl.getAttributesForExtension(Extension.LINKS);
if (linkAttrs != null) {
for (ExtensionMetadata attr : linkAttrs) {
LinkAttributes linkAttr = (LinkAttributes) attr;
@@ -451,17 +458,30 @@ protected AbstractSiteMap parseSiteMap(String url, byte[] content,
// skip href links duplicating sitemap URL
continue;
}
- Outlink ol = filterOutlink(sURL, hrefStr,
- parentMetadata, isSitemapKey, "false",
- isSitemapNewsKey, "false");
+ Outlink ol =
+ filterOutlink(
+ sURL,
+ hrefStr,
+ parentMetadata,
+ isSitemapKey,
+ "false",
+ isSitemapNewsKey,
+ "false");
if (ol != null) {
links.add(ol);
}
}
}
- Outlink ol = filterOutlink(sURL, target, parentMetadata,
- isSitemapKey, "false", isSitemapNewsKey, "false");
+ Outlink ol =
+ filterOutlink(
+ sURL,
+ target,
+ parentMetadata,
+ isSitemapKey,
+ "false",
+ isSitemapNewsKey,
+ "false");
if (ol == null) {
continue;
}
@@ -469,34 +489,33 @@ protected AbstractSiteMap parseSiteMap(String url, byte[] content,
links.add(ol);
LOG.debug("{} : [sitemap] {}", url, target);
}
- LOG.info("Sitemap (found {} links, {} skipped): {}", linksFound,
- linksSkippedNotRecentlyModified, url);
+ LOG.info(
+ "Sitemap (found {} links, {} skipped): {}",
+ linksFound,
+ linksSkippedNotRecentlyModified,
+ url);
}
return siteMap;
}
@Override
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public void prepare(Map stormConf, TopologyContext context,
- OutputCollector collector) {
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
super.prepare(stormConf, context, collector);
- sniffContent = ConfUtils.getBoolean(stormConf,
- "sitemap.sniffContent", false);
- filterHoursSinceModified = ConfUtils.getInt(stormConf,
- "sitemap.filter.hours.since.modified", -1);
+ sniffContent = ConfUtils.getBoolean(stormConf, "sitemap.sniffContent", false);
+ filterHoursSinceModified =
+ ConfUtils.getInt(stormConf, "sitemap.filter.hours.since.modified", -1);
parseFilters = ParseFilters.fromConf(stormConf);
- int maxOffsetGuess = ConfUtils.getInt(stormConf, "sitemap.offset.guess",
- 1024);
- contentDetector = new ContentDetector(
- NewsSiteMapParserBolt.contentClues, maxOffsetGuess);
- rssContentDetector = new ContentDetector(
- FeedDetectorBolt.contentClues, maxOffsetGuess);
- averagedMetrics = context.registerMetric(
- "news_sitemap_average_processing_time",
- new ReducedMetric(new MeanReducer()), 30);
- scheduleSitemapsWithDelay = ConfUtils.getInt(stormConf,
- "sitemap.schedule.delay", scheduleSitemapsWithDelay);
+ int maxOffsetGuess = ConfUtils.getInt(stormConf, "sitemap.offset.guess", 1024);
+ contentDetector = new ContentDetector(NewsSiteMapParserBolt.contentClues, maxOffsetGuess);
+ rssContentDetector = new ContentDetector(FeedDetectorBolt.contentClues, maxOffsetGuess);
+ averagedMetrics =
+ context.registerMetric(
+ "news_sitemap_average_processing_time",
+ new ReducedMetric(new MeanReducer()),
+ 30);
+ scheduleSitemapsWithDelay =
+ ConfUtils.getInt(stormConf, "sitemap.schedule.delay", scheduleSitemapsWithDelay);
}
-
}
diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/PreFilterBolt.java b/src/main/java/org/commoncrawl/stormcrawler/news/PreFilterBolt.java
index b986506..18106c3 100644
--- a/src/main/java/org/commoncrawl/stormcrawler/news/PreFilterBolt.java
+++ b/src/main/java/org/commoncrawl/stormcrawler/news/PreFilterBolt.java
@@ -1,9 +1,22 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.commoncrawl.stormcrawler.news;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Map;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
@@ -19,68 +32,67 @@
import org.slf4j.LoggerFactory;
/**
- * Variant of the URLFilterBolt to go upstream of the fetching to catch anything
- * before it goes further into the topology. If filtered, a URL gets an ERROR
- * status.
+ * Variant of the URLFilterBolt to go upstream of the fetching to catch anything before it goes
+ * further into the topology. If filtered, a URL gets an ERROR status.
*/
public class PreFilterBolt extends BaseRichBolt {
- protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- private URLFilters urlFilters;
-
- protected OutputCollector collector;
-
- private final String filterConfigFile;
-
- private static final String _s = org.apache.stormcrawler.Constants.StatusStreamName;
-
- public PreFilterBolt(String filterConfigFile) {
- this.filterConfigFile = filterConfigFile;
- }
-
- @Override
- public void execute(Tuple input) {
-
- // must have at least a URL and metadata
- String urlString = input.getStringByField("url");
- Metadata metadata = (Metadata) input.getValueByField("metadata");
-
- String filtered = urlFilters.filter(null, null, urlString);
- if (StringUtils.isBlank(filtered)) {
- LOG.debug("URL rejected: {}", urlString);
- // emit with an error to the status stream
- metadata.addValue("error.cause", "Filtered");
- Values v = new Values(urlString, metadata, Status.ERROR);
- collector.emit(_s, input, v);
- collector.ack(input);
- return;
- }
-
- // pass to std out
- Values v = new Values(urlString, metadata);
- collector.emit(input, v);
- collector.ack(input);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declareStream(_s, new Fields("url", "metadata", "status"));
- declarer.declare(new Fields("url", "metadata"));
- }
-
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- if (filterConfigFile != null) {
- try {
- urlFilters = new URLFilters(stormConf, filterConfigFile);
- } catch (IOException e) {
- throw new RuntimeException("Can't load filters from " + filterConfigFile);
- }
- } else {
- urlFilters = URLFilters.fromConf(stormConf);
- }
- }
-
+ protected static final Logger LOG =
+ LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private URLFilters urlFilters;
+
+ protected OutputCollector collector;
+
+ private final String filterConfigFile;
+
+ private static final String _s = org.apache.stormcrawler.Constants.StatusStreamName;
+
+ public PreFilterBolt(String filterConfigFile) {
+ this.filterConfigFile = filterConfigFile;
+ }
+
+ @Override
+ public void execute(Tuple input) {
+
+ // must have at least a URL and metadata
+ String urlString = input.getStringByField("url");
+ Metadata metadata = (Metadata) input.getValueByField("metadata");
+
+ String filtered = urlFilters.filter(null, null, urlString);
+ if (StringUtils.isBlank(filtered)) {
+ LOG.debug("URL rejected: {}", urlString);
+ // emit with an error to the status stream
+ metadata.addValue("error.cause", "Filtered");
+ Values v = new Values(urlString, metadata, Status.ERROR);
+ collector.emit(_s, input, v);
+ collector.ack(input);
+ return;
+ }
+
+ // pass to std out
+ Values v = new Values(urlString, metadata);
+ collector.emit(input, v);
+ collector.ack(input);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declareStream(_s, new Fields("url", "metadata", "status"));
+ declarer.declare(new Fields("url", "metadata"));
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ if (filterConfigFile != null) {
+ try {
+ urlFilters = new URLFilters(stormConf, filterConfigFile);
+ } catch (IOException e) {
+ throw new RuntimeException("Can't load filters from " + filterConfigFile);
+ }
+ } else {
+ urlFilters = URLFilters.fromConf(stormConf);
+ }
+ }
}
diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/PunycodeURLNormalizer.java b/src/main/java/org/commoncrawl/stormcrawler/news/PunycodeURLNormalizer.java
index 4adf03d..114477c 100644
--- a/src/main/java/org/commoncrawl/stormcrawler/news/PunycodeURLNormalizer.java
+++ b/src/main/java/org/commoncrawl/stormcrawler/news/PunycodeURLNormalizer.java
@@ -13,20 +13,18 @@
*/
package org.commoncrawl.stormcrawler.news;
+import com.fasterxml.jackson.databind.JsonNode;
import java.net.IDN;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Map;
-
import org.apache.stormcrawler.Metadata;
import org.apache.stormcrawler.filtering.URLFilter;
-import com.fasterxml.jackson.databind.JsonNode;
public class PunycodeURLNormalizer extends URLFilter {
@Override
- public void configure(Map stormConf, JsonNode filterParams) {
- }
+ public void configure(Map stormConf, JsonNode filterParams) {}
private boolean isAscii(String str) {
char[] chars = str.toCharArray();
@@ -39,8 +37,7 @@ private boolean isAscii(String str) {
}
@Override
- public String filter(URL sourceUrl, Metadata sourceMetadata,
- String urlToFilter) {
+ public String filter(URL sourceUrl, Metadata sourceMetadata, String urlToFilter) {
try {
URL url = new URL(urlToFilter);
String hostName = url.getHost();
@@ -51,12 +48,11 @@ public String filter(URL sourceUrl, Metadata sourceMetadata,
if (hostName.equals(url.getHost())) {
return urlToFilter;
}
- urlToFilter = new URL(url.getProtocol(), hostName, url.getPort(),
- url.getFile()).toString();
+ urlToFilter =
+ new URL(url.getProtocol(), hostName, url.getPort(), url.getFile()).toString();
} catch (MalformedURLException e) {
return null;
}
return urlToFilter;
}
-
}
diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/BootstrapTopology.java b/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/BootstrapTopology.java
index 821551e..14f45ff 100644
--- a/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/BootstrapTopology.java
+++ b/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/BootstrapTopology.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to DigitalPebble Ltd under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -19,31 +19,27 @@
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
-import org.commoncrawl.stormcrawler.news.CrawlTopology;
-import org.commoncrawl.stormcrawler.news.FeedDetectorBolt;
-import org.slf4j.LoggerFactory;
-
import org.apache.stormcrawler.ConfigurableTopology;
import org.apache.stormcrawler.Constants;
import org.apache.stormcrawler.bolt.FetcherBolt;
import org.apache.stormcrawler.bolt.JSoupParserBolt;
import org.apache.stormcrawler.bolt.URLFilterBolt;
import org.apache.stormcrawler.bolt.URLPartitionerBolt;
+import org.apache.stormcrawler.indexing.DummyIndexer;
import org.apache.stormcrawler.opensearch.persistence.AggregationSpout;
import org.apache.stormcrawler.opensearch.persistence.StatusUpdaterBolt;
-import org.apache.stormcrawler.indexing.DummyIndexer;
import org.apache.stormcrawler.spout.FileSpout;
import org.apache.stormcrawler.util.ConfUtils;
import org.apache.stormcrawler.util.URLStreamGrouping;
import org.apache.stormcrawler.warc.WARCHdfsBolt;
+import org.commoncrawl.stormcrawler.news.CrawlTopology;
+import org.commoncrawl.stormcrawler.news.FeedDetectorBolt;
+import org.slf4j.LoggerFactory;
-/**
- * Dummy topology to play with the spouts and bolts on ElasticSearch
- */
+/** Dummy topology to play with the spouts and bolts on ElasticSearch */
public class BootstrapTopology extends CrawlTopology {
- private static final org.slf4j.Logger LOG = LoggerFactory
- .getLogger(BootstrapTopology.class);
+ private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(BootstrapTopology.class);
public static void main(String[] args) throws Exception {
ConfigurableTopology.start(new BootstrapTopology(), args);
@@ -53,11 +49,14 @@ public static void main(String[] args) throws Exception {
protected int run(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
- LOG.debug("sitemap.sniffContent: {}",
+ LOG.debug(
+ "sitemap.sniffContent: {}",
ConfUtils.getBoolean(getConf(), "sitemap.sniffContent", false));
- LOG.info("sitemap.sniffContent: {}",
+ LOG.info(
+ "sitemap.sniffContent: {}",
ConfUtils.getBoolean(getConf(), "sitemap.sniffContent", false));
- LOG.warn("sitemap.sniffContent: {}",
+ LOG.warn(
+ "sitemap.sniffContent: {}",
ConfUtils.getBoolean(getConf(), "sitemap.sniffContent", false));
int numWorkers = ConfUtils.getInt(getConf(), "topology.workers", 1);
@@ -69,12 +68,11 @@ protected int run(String[] args) {
if (args.length >= 2) {
// arguments include seed directory and file pattern
LOG.info("Injecting seeds from {} by pattern {}", args[0], args[1]);
- builder.setSpout("filespout",
- new FileSpout(args[0], args[1], true));
+ builder.setSpout("filespout", new FileSpout(args[0], args[1], true));
Fields key = new Fields("url");
- builder.setBolt("filter", new URLFilterBolt()).fieldsGrouping(
- "filespout", Constants.StatusStreamName, key);
+ builder.setBolt("filter", new URLFilterBolt())
+ .fieldsGrouping("filespout", Constants.StatusStreamName, key);
}
builder.setSpout("spout", new AggregationSpout(), numShards);
@@ -91,12 +89,10 @@ protected int run(String[] args) {
builder.setBolt("feed", new FeedDetectorBolt(), numWorkers)
.localOrShuffleGrouping("sitemap");
- builder.setBolt("parse", new JSoupParserBolt())
- .localOrShuffleGrouping("feed");
+ builder.setBolt("parse", new JSoupParserBolt()).localOrShuffleGrouping("feed");
// don't need to parse the pages but need to update their status
- builder.setBolt("ssb", new DummyIndexer(), numWorkers)
- .localOrShuffleGrouping("parse");
+ builder.setBolt("ssb", new DummyIndexer(), numWorkers).localOrShuffleGrouping("parse");
WARCHdfsBolt warcbolt = getWarcBolt("CC-NEWS-BOOTSTRAP");
@@ -109,8 +105,7 @@ protected int run(String[] args) {
.localOrShuffleGrouping("parse", Constants.StatusStreamName)
.localOrShuffleGrouping("ssb", Constants.StatusStreamName)
.setNumTasks(numShards)
- .customGrouping("filter", Constants.StatusStreamName,
- new URLStreamGrouping());
+ .customGrouping("filter", Constants.StatusStreamName, new URLStreamGrouping());
return submit(conf, builder);
}
diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/FeedLinkParseFilter.java b/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/FeedLinkParseFilter.java
index 5707189..bb50291 100644
--- a/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/FeedLinkParseFilter.java
+++ b/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/FeedLinkParseFilter.java
@@ -14,19 +14,18 @@
package org.commoncrawl.stormcrawler.news.bootstrap;
import java.util.ArrayList;
-
-import org.slf4j.LoggerFactory;
-import org.w3c.dom.DocumentFragment;
-
import org.apache.stormcrawler.bolt.FeedParserBolt;
import org.apache.stormcrawler.parse.Outlink;
import org.apache.stormcrawler.parse.ParseResult;
import org.apache.stormcrawler.parse.filter.LinkParseFilter;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.DocumentFragment;
/**
- * ParseFilter which extracts exclusively RSS links via Xpath, all other links
- * are skipped. See {@link LinkParseFilter} how to register and configure in
- * parsefilters.json. A configuration snippet:
+ * ParseFilter which extracts exclusively RSS links via Xpath, all other links are skipped. See
+ * {@link LinkParseFilter} how to register and configure in parsefilters.json. A configuration
+ * snippet:
+ *
*
* {
* "class": "org.commoncrawl.stormcrawler.news.bootstrap.FeedLinkParseFilter",
@@ -41,12 +40,10 @@
*/
public class FeedLinkParseFilter extends LinkParseFilter {
- private static final org.slf4j.Logger LOG = LoggerFactory
- .getLogger(FeedLinkParseFilter.class);
+ private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(FeedLinkParseFilter.class);
@Override
- public void filter(String URL, byte[] content, DocumentFragment doc,
- ParseResult parse) {
+ public void filter(String URL, byte[] content, DocumentFragment doc, ParseResult parse) {
// skip existing links
logLinks(parse, URL, "Skipped links");
@@ -60,11 +57,8 @@ public void filter(String URL, byte[] content, DocumentFragment doc,
public static void logLinks(ParseResult parse, String URL, String message) {
if (LOG.isDebugEnabled() && parse.getOutlinks().size() > 0) {
- if (!message.isEmpty())
- LOG.debug("{} for {}:", message, URL);
- for (Outlink outlink : parse.getOutlinks())
- LOG.debug(outlink.getTargetURL());
+ if (!message.isEmpty()) LOG.debug("{} for {}:", message, URL);
+ for (Outlink outlink : parse.getOutlinks()) LOG.debug(outlink.getTargetURL());
}
}
-
}
diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/NewsSiteMapDetectorBolt.java b/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/NewsSiteMapDetectorBolt.java
index 1160201..f0af134 100644
--- a/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/NewsSiteMapDetectorBolt.java
+++ b/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/NewsSiteMapDetectorBolt.java
@@ -14,15 +14,10 @@
package org.commoncrawl.stormcrawler.news.bootstrap;
import java.util.Map;
-
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
-import org.commoncrawl.stormcrawler.news.ContentDetector;
-import org.commoncrawl.stormcrawler.news.NewsSiteMapParserBolt;
-import org.slf4j.LoggerFactory;
-
import org.apache.stormcrawler.Constants;
import org.apache.stormcrawler.Metadata;
import org.apache.stormcrawler.bolt.SiteMapParserBolt;
@@ -31,25 +26,26 @@
import org.apache.stormcrawler.parse.ParseFilters;
import org.apache.stormcrawler.parse.ParseResult;
import org.apache.stormcrawler.persistence.Status;
+import org.commoncrawl.stormcrawler.news.ContentDetector;
+import org.commoncrawl.stormcrawler.news.NewsSiteMapParserBolt;
+import org.slf4j.LoggerFactory;
/**
- * Detector for news
+ * Detector for news
* sitemaps and also sitemaps.
*/
@SuppressWarnings("serial")
public class NewsSiteMapDetectorBolt extends SiteMapParserBolt {
- private static final org.slf4j.Logger LOG = LoggerFactory
- .getLogger(NewsSiteMapDetectorBolt.class);
+ private static final org.slf4j.Logger LOG =
+ LoggerFactory.getLogger(NewsSiteMapDetectorBolt.class);
protected static final int maxOffsetContentGuess = 1024;
- private static ContentDetector contentDetector = new ContentDetector(
- NewsSiteMapParserBolt.contentClues, maxOffsetContentGuess);
+ private static ContentDetector contentDetector =
+ new ContentDetector(NewsSiteMapParserBolt.contentClues, maxOffsetContentGuess);
private ParseFilter parseFilters;
-
@Override
public void execute(Tuple tuple) {
Metadata metadata = (Metadata) tuple.getValueByField("metadata");
@@ -57,10 +53,9 @@ public void execute(Tuple tuple) {
byte[] content = tuple.getBinaryByField("content");
String url = tuple.getStringByField("url");
- boolean isSitemap = Boolean.valueOf(
- metadata.getFirstValue(SiteMapParserBolt.isSitemapKey));
- boolean isNewsSitemap = Boolean.valueOf(
- metadata.getFirstValue(NewsSiteMapParserBolt.isSitemapNewsKey));
+ boolean isSitemap = Boolean.valueOf(metadata.getFirstValue(SiteMapParserBolt.isSitemapKey));
+ boolean isNewsSitemap =
+ Boolean.valueOf(metadata.getFirstValue(NewsSiteMapParserBolt.isSitemapNewsKey));
if (!isNewsSitemap || !isSitemap) {
int match = contentDetector.getFirstMatch(content);
@@ -70,10 +65,8 @@ public void execute(Tuple tuple) {
metadata.setValue(SiteMapParserBolt.isSitemapKey, "true");
if (match <= NewsSiteMapParserBolt.contentCluesSitemapNewsMatchUpTo) {
isNewsSitemap = true;
- LOG.info("{} detected as news sitemap based on content",
- url);
- metadata.setValue(NewsSiteMapParserBolt.isSitemapNewsKey,
- "true");
+ LOG.info("{} detected as news sitemap based on content", url);
+ metadata.setValue(NewsSiteMapParserBolt.isSitemapNewsKey, "true");
}
}
}
@@ -85,8 +78,8 @@ public void execute(Tuple tuple) {
parseData.setMetadata(metadata);
parseFilters.filter(url, content, null, parse);
// emit status
- collector.emit(Constants.StatusStreamName, tuple,
- new Values(url, metadata, Status.FETCHED));
+ collector.emit(
+ Constants.StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED));
} else {
// pass on
collector.emit(tuple, tuple.getValues());
@@ -95,11 +88,9 @@ public void execute(Tuple tuple) {
}
@Override
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public void prepare(Map stormConf, TopologyContext context,
- OutputCollector collect) {
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collect) {
super.prepare(stormConf, context, collect);
parseFilters = ParseFilters.fromConf(stormConf);
}
-
}
diff --git a/src/test/java/org/commoncrawl/stormcrawler/FastURLFilterTest.java b/src/test/java/org/commoncrawl/stormcrawler/FastURLFilterTest.java
index 64df0ba..58aaa04 100644
--- a/src/test/java/org/commoncrawl/stormcrawler/FastURLFilterTest.java
+++ b/src/test/java/org/commoncrawl/stormcrawler/FastURLFilterTest.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to DigitalPebble Ltd under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership.
* DigitalPebble licenses this file to You under the Apache License, Version 2.0 (the "License");
@@ -14,74 +14,72 @@
*/
package org.commoncrawl.stormcrawler;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
-
+import org.apache.stormcrawler.Metadata;
+import org.apache.stormcrawler.filtering.URLFilter;
import org.commoncrawl.stormcrawler.filter.FastURLFilter;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.apache.stormcrawler.Metadata;
-import org.apache.stormcrawler.filtering.URLFilter;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
public class FastURLFilterTest {
protected static URLFilter filter;
@BeforeClass
public static void init() {
- filter = createFilter("fast-urlfilter.txt");
+ filter = createFilter("fast-urlfilter.txt");
}
public static FastURLFilter createFilter(String fileName) {
- ObjectNode filterParams = new ObjectNode(JsonNodeFactory.instance);
- filterParams.put("file", fileName);
- FastURLFilter filter = new FastURLFilter();
- Map conf = new HashMap<>();
- conf.put("fast.urlfilter.refresh", 10);
- filter.configure(conf, filterParams);
- return filter;
+ ObjectNode filterParams = new ObjectNode(JsonNodeFactory.instance);
+ filterParams.put("file", fileName);
+ FastURLFilter filter = new FastURLFilter();
+ Map conf = new HashMap<>();
+ conf.put("fast.urlfilter.refresh", 10);
+ filter.configure(conf, filterParams);
+ return filter;
}
@Test
public void testHostFilter() throws MalformedURLException {
- URL url = new URL("http://may.go.com/image.jpg");
- Metadata metadata = new Metadata();
- String filterResult = filter.filter(url, metadata, url.toExternalForm());
- Assert.assertEquals(url.toString(), filterResult);
-
- url = new URL("http://no.go.com/");
- filterResult = filter.filter(url, metadata, url.toExternalForm());
- Assert.assertEquals(null, filterResult);
+ URL url = new URL("http://may.go.com/image.jpg");
+ Metadata metadata = new Metadata();
+ String filterResult = filter.filter(url, metadata, url.toExternalForm());
+ Assert.assertEquals(url.toString(), filterResult);
+
+ url = new URL("http://no.go.com/");
+ filterResult = filter.filter(url, metadata, url.toExternalForm());
+ Assert.assertEquals(null, filterResult);
}
@Test
public void testDomainNotAllowed() throws MalformedURLException {
- URL url = new URL("http://domainnotallowed.com/forum/search.php");
- Metadata metadata = new Metadata();
- String filterResult = filter.filter(url, metadata, url.toExternalForm());
- Assert.assertEquals(null, filterResult);
-
- url = new URL("http://domainnotallowed.com/");
- filterResult = filter.filter(url, metadata, url.toExternalForm());
- Assert.assertEquals(null, filterResult);
-
- url = new URL("http://partiallyallowed.com/");
- filterResult = filter.filter(url, metadata, url.toExternalForm());
- Assert.assertEquals(url.toString(), filterResult);
-
- url = new URL("http://partiallyallowed.com/verbotten");
- filterResult = filter.filter(url, metadata, url.toExternalForm());
- Assert.assertEquals(null, filterResult);
+ URL url = new URL("http://domainnotallowed.com/forum/search.php");
+ Metadata metadata = new Metadata();
+ String filterResult = filter.filter(url, metadata, url.toExternalForm());
+ Assert.assertEquals(null, filterResult);
+
+ url = new URL("http://domainnotallowed.com/");
+ filterResult = filter.filter(url, metadata, url.toExternalForm());
+ Assert.assertEquals(null, filterResult);
+
+ url = new URL("http://partiallyallowed.com/");
+ filterResult = filter.filter(url, metadata, url.toExternalForm());
+ Assert.assertEquals(url.toString(), filterResult);
+
+ url = new URL("http://partiallyallowed.com/verbotten");
+ filterResult = filter.filter(url, metadata, url.toExternalForm());
+ Assert.assertEquals(null, filterResult);
- // allowed
- url = new URL("http://digitalpebble.com/");
- filterResult = filter.filter(url, metadata, url.toExternalForm());
- Assert.assertEquals(url.toString(), filterResult);
+ // allowed
+ url = new URL("http://digitalpebble.com/");
+ filterResult = filter.filter(url, metadata, url.toExternalForm());
+ Assert.assertEquals(url.toString(), filterResult);
}
}
diff --git a/src/test/java/org/commoncrawl/stormcrawler/news/NewsSiteMapParserTest.java b/src/test/java/org/commoncrawl/stormcrawler/news/NewsSiteMapParserTest.java
index db73d67..b0a0d5a 100644
--- a/src/test/java/org/commoncrawl/stormcrawler/news/NewsSiteMapParserTest.java
+++ b/src/test/java/org/commoncrawl/stormcrawler/news/NewsSiteMapParserTest.java
@@ -16,6 +16,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
+import crawlercommons.sitemaps.UnknownFormatException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -25,75 +26,83 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
import org.apache.commons.io.IOUtils;
-import org.commoncrawl.stormcrawler.news.NewsSiteMapParserBolt.SitemapType;
-import org.junit.Before;
-import org.junit.Test;
-
import org.apache.stormcrawler.Metadata;
import org.apache.stormcrawler.parse.Outlink;
import org.apache.stormcrawler.parse.ParsingTester;
-
-import crawlercommons.sitemaps.UnknownFormatException;
+import org.commoncrawl.stormcrawler.news.NewsSiteMapParserBolt.SitemapType;
+import org.junit.Before;
+import org.junit.Test;
public class NewsSiteMapParserTest extends ParsingTester {
@Before
public void setupParserBolt() {
- setupParserBolt(new NewsSiteMapParserBolt());
- Map config = new HashMap<>();
- config.put("sitemap.sniffContent", true);
- // allow items published during the last week
- config.put("sitemap.filter.hours.since.modified", 168);
- prepareParserBolt("test.parsefilters.json", config);
+ setupParserBolt(new NewsSiteMapParserBolt());
+ Map config = new HashMap<>();
+ config.put("sitemap.sniffContent", true);
+ // allow items published during the last week
+ config.put("sitemap.filter.hours.since.modified", 168);
+ prepareParserBolt("test.parsefilters.json", config);
}
@Test
public void testSiteMapParser() throws IOException, UnknownFormatException {
- String url = "https://example.org/sitemap-news.xml";
- byte[] content = readContent("sitemap-news.xml");
- String contentType = "";
- Metadata parentMetadata = new Metadata();
- List links = new ArrayList<>();
-
- SitemapType type = ((NewsSiteMapParserBolt) bolt).detectContent(url, content);
- assertEquals(SitemapType.NEWS, type);
-
- ((NewsSiteMapParserBolt) bolt).parseSiteMap(url, content, contentType, parentMetadata, links);
-
- // unmodified sitemap:
- // - publication date is far in the past, link should be skipped
- // 2008-12-23
- assertEquals("Outdated link not skipped", 0, links.size());
-
- // now set the publication date to yesterday
- LocalDateTime yesterday = LocalDateTime.now().minusDays(1);
- content = (new String(content, StandardCharsets.UTF_8))
- .replace("2008-12-23", ""
- + yesterday.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")) + "")
- .getBytes(StandardCharsets.UTF_8);
- ((NewsSiteMapParserBolt) bolt).parseSiteMap(url, content, contentType, parentMetadata, links);
-
- assertEquals("Expected one and one additional link - image links are ignored", 2,
- links.size());
+ String url = "https://example.org/sitemap-news.xml";
+ byte[] content = readContent("sitemap-news.xml");
+ String contentType = "";
+ Metadata parentMetadata = new Metadata();
+ List links = new ArrayList<>();
+
+ SitemapType type = ((NewsSiteMapParserBolt) bolt).detectContent(url, content);
+ assertEquals(SitemapType.NEWS, type);
+
+ ((NewsSiteMapParserBolt) bolt)
+ .parseSiteMap(url, content, contentType, parentMetadata, links);
+
+ // unmodified sitemap:
+ // - publication date is far in the past, link should be skipped
+ // 2008-12-23
+ assertEquals("Outdated link not skipped", 0, links.size());
+
+ // now set the publication date to yesterday
+ LocalDateTime yesterday = LocalDateTime.now().minusDays(1);
+ content =
+ (new String(content, StandardCharsets.UTF_8))
+ .replace(
+ "2008-12-23",
+ ""
+ + yesterday.format(
+ DateTimeFormatter.ofPattern("yyyy-MM-dd"))
+ + "")
+ .getBytes(StandardCharsets.UTF_8);
+ ((NewsSiteMapParserBolt) bolt)
+ .parseSiteMap(url, content, contentType, parentMetadata, links);
+
+ assertEquals(
+ "Expected one and one additional link - image links are ignored",
+ 2,
+ links.size());
}
protected byte[] readContent(String filename) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- IOUtils.copy(getClass().getClassLoader().getResourceAsStream(filename), baos);
- return baos.toByteArray();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ IOUtils.copy(getClass().getClassLoader().getResourceAsStream(filename), baos);
+ return baos.toByteArray();
}
- @Test
- public void testFeedWithSitemapNamespace() throws IOException, UnknownFormatException {
- String url = "https://example.org/feed.xml";
- byte[] content = readContent("feed-with-sitemap-namespace.xml");
- SitemapType type = ((NewsSiteMapParserBolt) bolt).detectContent(url, content);
- assertNotEquals("RSS feed with sitemap namespace should not be detected as sitemap",
- SitemapType.NEWS, type);
- assertNotEquals("RSS feed with sitemap namespace should not be detected as sitemap",
- SitemapType.SITEMAP, type);
- }
-
+ @Test
+ public void testFeedWithSitemapNamespace() throws IOException, UnknownFormatException {
+ String url = "https://example.org/feed.xml";
+ byte[] content = readContent("feed-with-sitemap-namespace.xml");
+ SitemapType type = ((NewsSiteMapParserBolt) bolt).detectContent(url, content);
+ assertNotEquals(
+ "RSS feed with sitemap namespace should not be detected as sitemap",
+ SitemapType.NEWS,
+ type);
+ assertNotEquals(
+ "RSS feed with sitemap namespace should not be detected as sitemap",
+ SitemapType.SITEMAP,
+ type);
+ }
}