From 4587a3cb919dcead9d1da39ade1c61cd31f73067 Mon Sep 17 00:00:00 2001
From: Luca Foppiano http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.commoncrawl.stormcrawler.filter;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3Object;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.LinkedHashMultimap;
+import com.google.common.collect.Multimap;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.IOException;
@@ -29,7 +37,6 @@
import java.util.TimerTask;
import java.util.regex.Pattern;
import java.util.zip.GZIPInputStream;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.stormcrawler.JSONResource;
import org.apache.stormcrawler.Metadata;
@@ -38,28 +45,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3ClientBuilder;
-import com.amazonaws.services.s3.model.GetObjectRequest;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.S3Object;
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.databind.JsonMappingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.google.common.collect.LinkedHashMultimap;
-import com.google.common.collect.Multimap;
-
/**
- * Version of the FastURLFilter that can load from a text representation instead
- * of the JSON that the SC version handles. Can also reload periodically and get
- * its content from S3.
- *
- * Filters URLs based on a file of regular expressions using host/domains
- * matching first. The default policy is to accept a URL if no matches are
- * found.
+ * Version of the FastURLFilter that can load from a text representation instead of the JSON that
+ * the SC version handles. Can also reload periodically and get its content from S3.
+ *
+ * Filters URLs based on a file of regular expressions using host/domains matching first. The
+ * default policy is to accept a URL if no matches are found.
+ *
+ * Rule Format:
*
- * Rule Format:
- *
* E.g., for "www.example.com" the rules given above are looked up in the following order:
+ *
* For rules either the URL path ( Rules are applied in the order of their definition. For better performance, regular
+ * expressions which are simpler/faster or match more URLs should be defined earlier.
+ *
+ * Comments in the rule file start with the The rules file is defined via the property http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.commoncrawl.stormcrawler.news;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
-
import org.apache.storm.topology.BoltDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
@@ -43,9 +39,7 @@
import org.apache.stormcrawler.warc.WARCHdfsBolt;
import org.slf4j.LoggerFactory;
-/**
- * Dummy topology to play with the spouts and bolts on OpenSearch
- */
+/** Dummy topology to play with the spouts and bolts on OpenSearch */
public class CrawlTopology extends ConfigurableTopology {
private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(CrawlTopology.class);
@@ -70,22 +64,28 @@ protected int run(String[] args) {
builder.setSpout("filespout", new FileSpout(args[0], args[1], true));
Fields key = new Fields("url");
- builder.setBolt("filter", new URLFilterBolt()).fieldsGrouping("filespout", Constants.StatusStreamName, key);
+ builder.setBolt("filter", new URLFilterBolt())
+ .fieldsGrouping("filespout", Constants.StatusStreamName, key);
}
builder.setSpout("spout", new AggregationSpout(), numShards);
- builder.setBolt("prefilter", new PreFilterBolt("pre-urlfilters.json"), numWorkers).shuffleGrouping("spout");
+ builder.setBolt("prefilter", new PreFilterBolt("pre-urlfilters.json"), numWorkers)
+ .shuffleGrouping("spout");
- builder.setBolt("partitioner", new URLPartitionerBolt(), numWorkers).shuffleGrouping("prefilter");
+ builder.setBolt("partitioner", new URLPartitionerBolt(), numWorkers)
+ .shuffleGrouping("prefilter");
- builder.setBolt("fetch", new FetcherBolt(), numWorkers).fieldsGrouping("partitioner", new Fields("key"));
+ builder.setBolt("fetch", new FetcherBolt(), numWorkers)
+ .fieldsGrouping("partitioner", new Fields("key"));
builder.setBolt("sitemap", new NewsSiteMapParserBolt(), numWorkers)
.setNumTasks(2)
.localOrShuffleGrouping("fetch");
- builder.setBolt("feed", new FeedParserBolt(), numWorkers).setNumTasks(4).localOrShuffleGrouping("sitemap");
+ builder.setBolt("feed", new FeedParserBolt(), numWorkers)
+ .setNumTasks(4)
+ .localOrShuffleGrouping("sitemap");
// don't need to parse the pages but need to update their status
builder.setBolt("ssb", new DummyIndexer(), numWorkers).localOrShuffleGrouping("feed");
@@ -98,15 +98,17 @@ protected int run(String[] args) {
final Fields furl = new Fields("url");
- BoltDeclarer statusBolt = builder.setBolt("status", new StatusUpdaterBolt(), numWorkers)
- .fieldsGrouping("fetch", Constants.StatusStreamName, furl)
- .fieldsGrouping("sitemap", Constants.StatusStreamName, furl)
- .fieldsGrouping("feed", Constants.StatusStreamName, furl)
- .fieldsGrouping("ssb", Constants.StatusStreamName, furl)
- .fieldsGrouping("prefilter", Constants.StatusStreamName, furl);
+ BoltDeclarer statusBolt =
+ builder.setBolt("status", new StatusUpdaterBolt(), numWorkers)
+ .fieldsGrouping("fetch", Constants.StatusStreamName, furl)
+ .fieldsGrouping("sitemap", Constants.StatusStreamName, furl)
+ .fieldsGrouping("feed", Constants.StatusStreamName, furl)
+ .fieldsGrouping("ssb", Constants.StatusStreamName, furl)
+ .fieldsGrouping("prefilter", Constants.StatusStreamName, furl);
if (args.length >= 2) {
- statusBolt.customGrouping("filter", Constants.StatusStreamName, new URLStreamGrouping());
+ statusBolt.customGrouping(
+ "filter", Constants.StatusStreamName, new URLStreamGrouping());
}
statusBolt.setNumTasks(numShards);
@@ -127,12 +129,15 @@ protected WARCHdfsBolt getWarcBolt(String filePrefix) {
String userAgent = AbstractHttpProtocol.getAgentString(getConf());
fields.put("http-header-user-agent", userAgent);
fields.put("http-header-from", ConfUtils.getString(getConf(), "http.agent.email"));
- String robotsTxtParser = "checked by crawler-commons "
- + crawlercommons.CrawlerCommons.getVersion()
- + " (https://github.com/crawler-commons/crawler-commons)";
+ String robotsTxtParser =
+ "checked by crawler-commons "
+ + crawlercommons.CrawlerCommons.getVersion()
+ + " (https://github.com/crawler-commons/crawler-commons)";
fields.put("robots", robotsTxtParser);
fields.put("format", "WARC File Format 1.1");
- fields.put("conformsTo", "https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.1/");
+ fields.put(
+ "conformsTo",
+ "https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.1/");
WARCHdfsBolt warcbolt = (WARCHdfsBolt) new WARCHdfsBolt();
warcbolt.withConfigKey("warc");
@@ -156,5 +161,4 @@ protected WARCHdfsBolt getWarcBolt(String filePrefix) {
return warcbolt;
}
-
}
diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/FeedDetectorBolt.java b/src/main/java/org/commoncrawl/stormcrawler/news/FeedDetectorBolt.java
index 6fed661..e607dc2 100644
--- a/src/main/java/org/commoncrawl/stormcrawler/news/FeedDetectorBolt.java
+++ b/src/main/java/org/commoncrawl/stormcrawler/news/FeedDetectorBolt.java
@@ -14,13 +14,11 @@
package org.commoncrawl.stormcrawler.news;
import java.util.Map;
-
+import org.apache.http.HttpHeaders;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
-import org.slf4j.LoggerFactory;
-
import org.apache.stormcrawler.Constants;
import org.apache.stormcrawler.Metadata;
import org.apache.stormcrawler.bolt.FeedParserBolt;
@@ -29,7 +27,7 @@
import org.apache.stormcrawler.parse.ParseFilters;
import org.apache.stormcrawler.parse.ParseResult;
import org.apache.stormcrawler.persistence.Status;
-import org.apache.http.HttpHeaders;
+import org.slf4j.LoggerFactory;
/** Detect RSS and Atom feeds, but do not parse and extract links */
@SuppressWarnings("serial")
@@ -41,7 +39,8 @@ public class FeedDetectorBolt extends FeedParserBolt {
public static String[][] contentClues = {{" http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.commoncrawl.stormcrawler.news.bootstrap;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
-import org.commoncrawl.stormcrawler.news.CrawlTopology;
-import org.commoncrawl.stormcrawler.news.FeedDetectorBolt;
-import org.slf4j.LoggerFactory;
-
import org.apache.stormcrawler.ConfigurableTopology;
import org.apache.stormcrawler.Constants;
import org.apache.stormcrawler.bolt.FetcherBolt;
import org.apache.stormcrawler.bolt.JSoupParserBolt;
import org.apache.stormcrawler.bolt.URLFilterBolt;
import org.apache.stormcrawler.bolt.URLPartitionerBolt;
+import org.apache.stormcrawler.indexing.DummyIndexer;
import org.apache.stormcrawler.opensearch.persistence.AggregationSpout;
import org.apache.stormcrawler.opensearch.persistence.StatusUpdaterBolt;
-import org.apache.stormcrawler.indexing.DummyIndexer;
import org.apache.stormcrawler.spout.FileSpout;
import org.apache.stormcrawler.util.ConfUtils;
import org.apache.stormcrawler.util.URLStreamGrouping;
import org.apache.stormcrawler.warc.WARCHdfsBolt;
+import org.commoncrawl.stormcrawler.news.CrawlTopology;
+import org.commoncrawl.stormcrawler.news.FeedDetectorBolt;
+import org.slf4j.LoggerFactory;
-/**
- * Dummy topology to play with the spouts and bolts on ElasticSearch
- */
+/** Dummy topology to play with the spouts and bolts on ElasticSearch */
public class BootstrapTopology extends CrawlTopology {
private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(BootstrapTopology.class);
@@ -52,9 +46,15 @@ public static void main(String[] args) throws Exception {
protected int run(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
- LOG.debug("sitemap.sniffContent: {}", ConfUtils.getBoolean(getConf(), "sitemap.sniffContent", false));
- LOG.info("sitemap.sniffContent: {}", ConfUtils.getBoolean(getConf(), "sitemap.sniffContent", false));
- LOG.warn("sitemap.sniffContent: {}", ConfUtils.getBoolean(getConf(), "sitemap.sniffContent", false));
+ LOG.debug(
+ "sitemap.sniffContent: {}",
+ ConfUtils.getBoolean(getConf(), "sitemap.sniffContent", false));
+ LOG.info(
+ "sitemap.sniffContent: {}",
+ ConfUtils.getBoolean(getConf(), "sitemap.sniffContent", false));
+ LOG.warn(
+ "sitemap.sniffContent: {}",
+ ConfUtils.getBoolean(getConf(), "sitemap.sniffContent", false));
int numWorkers = ConfUtils.getInt(getConf(), "topology.workers", 1);
@@ -68,18 +68,23 @@ protected int run(String[] args) {
builder.setSpout("filespout", new FileSpout(args[0], args[1], true));
Fields key = new Fields("url");
- builder.setBolt("filter", new URLFilterBolt()).fieldsGrouping("filespout", Constants.StatusStreamName, key);
+ builder.setBolt("filter", new URLFilterBolt())
+ .fieldsGrouping("filespout", Constants.StatusStreamName, key);
}
builder.setSpout("spout", new AggregationSpout(), numShards);
- builder.setBolt("partitioner", new URLPartitionerBolt(), numWorkers).shuffleGrouping("spout");
+ builder.setBolt("partitioner", new URLPartitionerBolt(), numWorkers)
+ .shuffleGrouping("spout");
- builder.setBolt("fetch", new FetcherBolt(), numWorkers).fieldsGrouping("partitioner", new Fields("key"));
+ builder.setBolt("fetch", new FetcherBolt(), numWorkers)
+ .fieldsGrouping("partitioner", new Fields("key"));
- builder.setBolt("sitemap", new NewsSiteMapDetectorBolt(), numWorkers).localOrShuffleGrouping("fetch");
+ builder.setBolt("sitemap", new NewsSiteMapDetectorBolt(), numWorkers)
+ .localOrShuffleGrouping("fetch");
- builder.setBolt("feed", new FeedDetectorBolt(), numWorkers).localOrShuffleGrouping("sitemap");
+ builder.setBolt("feed", new FeedDetectorBolt(), numWorkers)
+ .localOrShuffleGrouping("sitemap");
builder.setBolt("parse", new JSoupParserBolt()).localOrShuffleGrouping("feed");
diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/FeedLinkParseFilter.java b/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/FeedLinkParseFilter.java
index 6ac664d..bb50291 100644
--- a/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/FeedLinkParseFilter.java
+++ b/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/FeedLinkParseFilter.java
@@ -14,20 +14,18 @@
package org.commoncrawl.stormcrawler.news.bootstrap;
import java.util.ArrayList;
-
-import org.slf4j.LoggerFactory;
-import org.w3c.dom.DocumentFragment;
-
import org.apache.stormcrawler.bolt.FeedParserBolt;
import org.apache.stormcrawler.parse.Outlink;
import org.apache.stormcrawler.parse.ParseResult;
import org.apache.stormcrawler.parse.filter.LinkParseFilter;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.DocumentFragment;
/**
- * ParseFilter which extracts exclusively RSS links via Xpath, all other links
- * are skipped. See {@link LinkParseFilter} how to register and configure in
- * parsefilters.json. A configuration snippet:
- *
+ * ParseFilter which extracts exclusively RSS links via Xpath, all other links are skipped. See
+ * {@link LinkParseFilter} how to register and configure in parsefilters.json. A configuration
+ * snippet:
+ *
* http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.commoncrawl.stormcrawler.filter;
diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java b/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java
index 9368d2c..cc2d85a 100644
--- a/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java
+++ b/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java
@@ -1,15 +1,17 @@
-/**
- * Licensed to DigitalPebble Ltd under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership.
- * DigitalPebble licenses this file to You under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License. You may obtain a copy of the
- * License at
+/*
+ * Licensed to DigitalPebble Ltd under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * DigitalPebble licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.commoncrawl.stormcrawler.news;
diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/BootstrapTopology.java b/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/BootstrapTopology.java
index 5b46acf..14f45ff 100644
--- a/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/BootstrapTopology.java
+++ b/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/BootstrapTopology.java
@@ -1,17 +1,20 @@
-/**
- * Licensed to DigitalPebble Ltd under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership.
- * DigitalPebble licenses this file to You under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License. You may obtain a copy of the
- * License at
+/*
+ * Licensed to DigitalPebble Ltd under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * DigitalPebble licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.commoncrawl.stormcrawler.news.bootstrap;
import org.apache.storm.topology.TopologyBuilder;
diff --git a/src/test/java/org/commoncrawl/stormcrawler/FastURLFilterTest.java b/src/test/java/org/commoncrawl/stormcrawler/FastURLFilterTest.java
index 1931aca..58aaa04 100644
--- a/src/test/java/org/commoncrawl/stormcrawler/FastURLFilterTest.java
+++ b/src/test/java/org/commoncrawl/stormcrawler/FastURLFilterTest.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to DigitalPebble Ltd under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership.
* DigitalPebble licenses this file to You under the Apache License, Version 2.0 (the "License");
From 0d0606ddfbfeea9348393e3ff8f2e33495d704ba Mon Sep 17 00:00:00 2001
From: Luca Foppiano urlfilter.fast.file,
* the default name is fast-urlfilter.txt.
*/
-public class FastURLFilter extends URLFilter implements JSONResource {
+public class FastURLFilter extends URLFilter implements JSONResource {
protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -127,41 +127,41 @@ public class FastURLFilter extends URLFilter implements JSONResource {
public void configure(@SuppressWarnings("rawtypes") Map stormConf, JsonNode filterParams) {
- // read from conf first
- int refreshRate = ConfUtils.getInt(stormConf, "fast.urlfilter.refresh", -1);
- this.resourceFile = ConfUtils.getString(stormConf, "fast.urlfilter.file", null);
-
- // then from the param file (which needs recompiling in case of change)
- if (filterParams != null) {
- JsonNode node = filterParams.get("file");
- if (node != null && node.isTextual() && this.resourceFile == null) {
- this.resourceFile = node.asText();
- }
- node = filterParams.get("refresh");
- if (node != null && node.isInt() && refreshRate == -1) {
- refreshRate = node.asInt();
- }
- }
-
- try {
- loadJSONResources();
- } catch (Exception e) {
- LOG.error("Exception while loading resources", e);
- }
-
- if (refreshRate != -1) {
- LOG.info("Filter set to reload from {} every {} sec", getResourceFile(), refreshRate);
- new Timer().schedule(new TimerTask() {
- public void run() {
- LOG.info("Reloading resources");
- try {
- loadJSONResources();
- } catch (Exception e) {
- LOG.error("Can't load resources", e);
- }
- }
- }, refreshRate * 1000, refreshRate * 1000);
- }
+ // read from conf first
+ int refreshRate = ConfUtils.getInt(stormConf, "fast.urlfilter.refresh", -1);
+ this.resourceFile = ConfUtils.getString(stormConf, "fast.urlfilter.file", null);
+
+ // then from the param file (which needs recompiling in case of change)
+ if (filterParams != null) {
+ JsonNode node = filterParams.get("file");
+ if (node != null && node.isTextual() && this.resourceFile == null) {
+ this.resourceFile = node.asText();
+ }
+ node = filterParams.get("refresh");
+ if (node != null && node.isInt() && refreshRate == -1) {
+ refreshRate = node.asInt();
+ }
+ }
+
+ try {
+ loadJSONResources();
+ } catch (Exception e) {
+ LOG.error("Exception while loading resources", e);
+ }
+
+ if (refreshRate != -1) {
+ LOG.info("Filter set to reload from {} every {} sec", getResourceFile(), refreshRate);
+ new Timer().schedule(new TimerTask() {
+ public void run() {
+ LOG.info("Reloading resources");
+ try {
+ loadJSONResources();
+ } catch (Exception e) {
+ LOG.error("Can't load resources", e);
+ }
+ }
+ }, refreshRate * 1000, refreshRate * 1000);
+ }
}
/**
@@ -171,252 +171,256 @@ public void run() {
**/
@Override
public void loadJSONResources() throws Exception {
- InputStream inputStream = null;
- AmazonS3 s3client = null;
- try {
- if (getResourceFile().startsWith("s3://")) {
- // try loading from S3
- s3client = AmazonS3ClientBuilder.standard().build();
- java.net.URI uri = new java.net.URI(getResourceFile());
-
- String bucketName = uri.getHost();
- // remove the first "/"
- String path = uri.getPath().substring(1);
-
- // optimisation - avoid a full reload if the resource has not changed
- ObjectMetadata metadata = s3client.getObjectMetadata(bucketName, path);
- final String ETAG = metadata.getETag();
- if (ETAG != null && ETAG.equals(resourceETAG)) {
- LOG.info("Unchanged ETAG for {} - skipping reload", getResourceFile());
- return;
- } else {
- resourceETAG = ETAG;
- }
-
- final S3Object object = s3client.getObject(new GetObjectRequest(bucketName, path));
- inputStream = object.getObjectContent();
- } else {
- inputStream = getClass().getClassLoader().getResourceAsStream(getResourceFile());
- if (inputStream == null) {
- LOG.error("Can't load conf from {}", getResourceFile());
- return;
- }
- }
- if (getResourceFile().endsWith(".gz")) {
- inputStream = new GZIPInputStream(inputStream);
- }
-
- loadJSONResources(new BufferedInputStream(inputStream));
- } finally {
- if (inputStream != null) {
- inputStream.close();
- }
- if (s3client != null) {
- s3client.shutdown();
- }
- }
+ InputStream inputStream = null;
+ AmazonS3 s3client = null;
+ try {
+ if (getResourceFile().startsWith("s3://")) {
+ // try loading from S3
+ s3client = AmazonS3ClientBuilder.standard().build();
+ java.net.URI uri = new java.net.URI(getResourceFile());
+
+ String bucketName = uri.getHost();
+ // remove the first "/"
+ String path = uri.getPath().substring(1);
+
+ // optimisation - avoid a full reload if the resource has not changed
+ ObjectMetadata metadata = s3client.getObjectMetadata(bucketName, path);
+ final String ETAG = metadata.getETag();
+ if (ETAG != null && ETAG.equals(resourceETAG)) {
+ LOG.info("Unchanged ETAG for {} - skipping reload", getResourceFile());
+ return;
+ } else {
+ resourceETAG = ETAG;
+ }
+
+ final S3Object object = s3client.getObject(new GetObjectRequest(bucketName, path));
+ inputStream = object.getObjectContent();
+ } else {
+ inputStream = getClass().getClassLoader().getResourceAsStream(getResourceFile());
+ if (inputStream == null) {
+ LOG.error("Can't load conf from {}", getResourceFile());
+ return;
+ }
+ }
+ if (getResourceFile().endsWith(".gz")) {
+ inputStream = new GZIPInputStream(inputStream);
+ }
+
+ loadJSONResources(new BufferedInputStream(inputStream));
+ } finally {
+ if (inputStream != null) {
+ inputStream.close();
+ }
+ if (s3client != null) {
+ s3client.shutdown();
+ }
+ }
}
@Override
public void loadJSONResources(InputStream inputStream)
- throws JsonParseException, JsonMappingException, IOException {
- long start = System.currentTimeMillis();
-
- try (Reader r = new InputStreamReader(inputStream)) {
- reloadRules(r);
- }
-
- long end = System.currentTimeMillis();
- LOG.info("Loaded {} hostrules and {} domain rules in {} msec from {}", hostRules.size(), domainRules.size(),
- (end - start), resourceFile);
+ throws JsonParseException, JsonMappingException, IOException {
+ long start = System.currentTimeMillis();
+
+ try (Reader r = new InputStreamReader(inputStream)) {
+ reloadRules(r);
+ }
+
+ long end = System.currentTimeMillis();
+ LOG.info(
+ "Loaded {} hostrules and {} domain rules in {} msec from {}",
+ hostRules.size(),
+ domainRules.size(),
+ (end - start),
+ resourceFile);
}
@Override
public String getResourceFile() {
- return resourceFile;
+ return resourceFile;
}
@Override
public String filter(URL sourceUrl, Metadata sourceMetadata, String urlToFilter) {
- synchronized (this) {
- URL u;
-
- try {
- u = new URL(urlToFilter);
- } catch (Exception e) {
- LOG.debug("Rejected {} because failed to parse as URL: {}", urlToFilter, e.getMessage());
- return null;
- }
-
- String hostname = u.getHost();
-
- // first check for host-specific rules
- for (Rule rule : hostRules.get(hostname)) {
- if (rule.match(u)) {
- return null;
- }
- }
-
- // also look up domain rules for host name
- for (Rule rule : domainRules.get(hostname)) {
- if (rule.match(u)) {
- return null;
- }
- }
-
- // check suffixes of host name from longer to shorter:
- // subdomains, domain, top-level domain
- int start = 0;
- int pos;
- while ((pos = hostname.indexOf('.', start)) != -1) {
- start = pos + 1;
- String domain = hostname.substring(start);
- for (Rule rule : domainRules.get(domain)) {
- if (rule.match(u)) {
- return null;
- }
- }
- }
-
- // finally check "global" rules defined for `Domain .`
- for (Rule rule : domainRules.get(".")) {
- if (rule.match(u)) {
- return null;
- }
- }
-
- // no reject rules found
- return urlToFilter;
- }
+ synchronized (this) {
+ URL u;
+
+ try {
+ u = new URL(urlToFilter);
+ } catch (Exception e) {
+ LOG.debug("Rejected {} because failed to parse as URL: {}", urlToFilter, e.getMessage());
+ return null;
+ }
+
+ String hostname = u.getHost();
+
+ // first check for host-specific rules
+ for (Rule rule : hostRules.get(hostname)) {
+ if (rule.match(u)) {
+ return null;
+ }
+ }
+
+ // also look up domain rules for host name
+ for (Rule rule : domainRules.get(hostname)) {
+ if (rule.match(u)) {
+ return null;
+ }
+ }
+
+ // check suffixes of host name from longer to shorter:
+ // subdomains, domain, top-level domain
+ int start = 0;
+ int pos;
+ while ((pos = hostname.indexOf('.', start)) != -1) {
+ start = pos + 1;
+ String domain = hostname.substring(start);
+ for (Rule rule : domainRules.get(domain)) {
+ if (rule.match(u)) {
+ return null;
+ }
+ }
+ }
+
+ // finally check "global" rules defined for `Domain .`
+ for (Rule rule : domainRules.get(".")) {
+ if (rule.match(u)) {
+ return null;
+ }
+ }
+
+ // no reject rules found
+ return urlToFilter;
+ }
}
private void reloadRules(Reader rules) throws IOException {
- synchronized (this) {
- domainRules.clear();
- hostRules.clear();
-
- BufferedReader reader = new BufferedReader(rules);
-
- String current = null;
- boolean host = false;
- int lineno = 0;
-
- String line;
- try {
- while ((line = reader.readLine()) != null) {
- lineno++;
- line = line.trim();
-
- if (line.indexOf("#") != -1) {
- // strip comments
- line = line.substring(0, line.indexOf("#")).trim();
- }
-
- if (StringUtils.isBlank(line)) {
- continue;
- }
-
- if (line.startsWith("Host")) {
- host = true;
- current = line.split("\\s+")[1];
- } else if (line.startsWith("Domain")) {
- host = false;
- current = line.split("\\s+")[1];
- } else {
- if (current == null) {
- continue;
- }
-
- Rule rule = null;
- try {
- if (CATCH_ALL_RULE.matcher(line).matches()) {
- rule = DenyAllRule.getInstance();
- } else if (line.startsWith("DenyPathQuery")) {
- rule = new DenyPathQueryRule(line.split("\\s+")[1]);
- } else if (line.startsWith("DenyPath")) {
- rule = new DenyPathRule(line.split("\\s+")[1]);
- } else {
- LOG.warn("Problem reading rule on line {}: {}", lineno, line);
- continue;
- }
- } catch (Exception e) {
- LOG.warn("Problem reading rule on line {}: {} - {}", lineno, line, e.getMessage());
- continue;
- }
-
- if (host) {
- LOG.trace("Adding host rule [{}] [{}]", current, rule);
- hostRules.put(current, rule);
- } else {
- LOG.trace("Adding domain rule [{}] [{}]", current, rule);
- domainRules.put(current, rule);
- }
- }
- }
-
- } catch (IOException e) {
- LOG.warn("Caught exception while reading rules file at line {}: {}", lineno, e.getMessage());
- throw e;
- }
- }
+ synchronized (this) {
+ domainRules.clear();
+ hostRules.clear();
+
+ BufferedReader reader = new BufferedReader(rules);
+
+ String current = null;
+ boolean host = false;
+ int lineno = 0;
+
+ String line;
+ try {
+ while ((line = reader.readLine()) != null) {
+ lineno++;
+ line = line.trim();
+
+ if (line.indexOf("#") != -1) {
+ // strip comments
+ line = line.substring(0, line.indexOf("#")).trim();
+ }
+
+ if (StringUtils.isBlank(line)) {
+ continue;
+ }
+
+ if (line.startsWith("Host")) {
+ host = true;
+ current = line.split("\\s+")[1];
+ } else if (line.startsWith("Domain")) {
+ host = false;
+ current = line.split("\\s+")[1];
+ } else {
+ if (current == null) {
+ continue;
+ }
+
+ Rule rule = null;
+ try {
+ if (CATCH_ALL_RULE.matcher(line).matches()) {
+ rule = DenyAllRule.getInstance();
+ } else if (line.startsWith("DenyPathQuery")) {
+ rule = new DenyPathQueryRule(line.split("\\s+")[1]);
+ } else if (line.startsWith("DenyPath")) {
+ rule = new DenyPathRule(line.split("\\s+")[1]);
+ } else {
+ LOG.warn("Problem reading rule on line {}: {}", lineno, line);
+ continue;
+ }
+ } catch (Exception e) {
+ LOG.warn("Problem reading rule on line {}: {} - {}", lineno, line, e.getMessage());
+ continue;
+ }
+
+ if (host) {
+ LOG.trace("Adding host rule [{}] [{}]", current, rule);
+ hostRules.put(current, rule);
+ } else {
+ LOG.trace("Adding domain rule [{}] [{}]", current, rule);
+ domainRules.put(current, rule);
+ }
+ }
+ }
+
+ } catch (IOException e) {
+ LOG.warn("Caught exception while reading rules file at line {}: {}", lineno, e.getMessage());
+ throw e;
+ }
+ }
}
public static class Rule {
- protected Pattern pattern;
+ protected Pattern pattern;
- Rule() {
- }
+ Rule() {
+ }
- public Rule(String regex) {
- pattern = Pattern.compile(regex);
- }
+ public Rule(String regex) {
+ pattern = Pattern.compile(regex);
+ }
- public boolean match(URL url) {
- return pattern.matcher(url.toString()).find();
- }
+ public boolean match(URL url) {
+ return pattern.matcher(url.toString()).find();
+ }
- public String toString() {
- return pattern.toString();
- }
+ public String toString() {
+ return pattern.toString();
+ }
}
public static class DenyPathRule extends Rule {
- public DenyPathRule(String regex) {
- super(regex);
- }
-
- public boolean match(URL url) {
- String haystack = url.getPath();
- return pattern.matcher(haystack).find();
- }
+ public DenyPathRule(String regex) {
+ super(regex);
+ }
+
+ public boolean match(URL url) {
+ String haystack = url.getPath();
+ return pattern.matcher(haystack).find();
+ }
}
/** Rule for DenyPath .* or DenyPath .? */
public static class DenyAllRule extends Rule {
- private static Rule instance = new DenyAllRule(".");
+ private static Rule instance = new DenyAllRule(".");
- private DenyAllRule(String regex) {
- super(regex);
- }
+ private DenyAllRule(String regex) {
+ super(regex);
+ }
- public static Rule getInstance() {
- return instance;
- }
+ public static Rule getInstance() {
+ return instance;
+ }
- public boolean match(URL url) {
- return true;
- }
+ public boolean match(URL url) {
+ return true;
+ }
}
public static class DenyPathQueryRule extends Rule {
- public DenyPathQueryRule(String regex) {
- super(regex);
- }
-
- public boolean match(URL url) {
- String haystack = url.getFile();
- return pattern.matcher(haystack).find();
- }
+ public DenyPathQueryRule(String regex) {
+ super(regex);
+ }
+
+ public boolean match(URL url) {
+ String haystack = url.getFile();
+ return pattern.matcher(haystack).find();
+ }
}
}
diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/ContentDetector.java b/src/main/java/org/commoncrawl/stormcrawler/news/ContentDetector.java
index 1c3a4a5..7305195 100644
--- a/src/main/java/org/commoncrawl/stormcrawler/news/ContentDetector.java
+++ b/src/main/java/org/commoncrawl/stormcrawler/news/ContentDetector.java
@@ -27,19 +27,17 @@ public class ContentDetector {
* Set up detector to detect content sniffing for a set of clue strings in a
* prefix of the binary content.
*
- * @param clues
- * nested list of literal clues. Outer list defines an OR-group,
- * inner list contained ANDed clues required to match all, e.g.
- * the following definition would match if either
- * "clue1" and "and_clue2" are matched, or
- * alternatively "or_clue3" is found
+ * @param clues nested list of literal clues. Outer list defines an
+ * OR-group, inner list contained ANDed clues required to match
+ * all, e.g. the following definition would match if either
+ * "clue1" and "and_clue2" are matched, or
+ * alternatively "or_clue3" is found
*
- *
- * { { clue1, and_clue2 }, { or_clue3 } }
- *
+ *
+ * { { clue1, and_clue2 }, { or_clue3 } }
+ *
*
- * @param maxOffset
- * max. offset of content prefix checked for clues
+ * @param maxOffset max. offset of content prefix checked for clues
*/
public ContentDetector(String[][] clues, int maxOffset) {
this.maxOffset = maxOffset;
@@ -56,8 +54,7 @@ public int getFirstMatch(byte[] content) {
if (content.length > maxOffset) {
beginning = Arrays.copyOfRange(content, 0, maxOffset);
}
- OR:
- for (int i = 0; i < clues.length; i++) {
+ OR : for (int i = 0; i < clues.length; i++) {
byte[][] group = clues[i];
for (byte[] clue : group) {
if (Bytes.indexOf(beginning, clue) == -1)
diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java b/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java
index 3a3c018..3c5d247 100644
--- a/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java
+++ b/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java
@@ -51,108 +51,110 @@ public class CrawlTopology extends ConfigurableTopology {
private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(CrawlTopology.class);
public static void main(String[] args) throws Exception {
- ConfigurableTopology.start(new CrawlTopology(), args);
+ ConfigurableTopology.start(new CrawlTopology(), args);
}
@Override
protected int run(String[] args) {
- TopologyBuilder builder = new TopologyBuilder();
+ TopologyBuilder builder = new TopologyBuilder();
- int numWorkers = ConfUtils.getInt(getConf(), "topology.workers", 1);
+ int numWorkers = ConfUtils.getInt(getConf(), "topology.workers", 1);
- // set to the real number of shards ONLY if es.status.routing is set to
- // true in the configuration
- int numShards = 16;
+ // set to the real number of shards ONLY if es.status.routing is set to
+ // true in the configuration
+ int numShards = 16;
- if (args.length >= 2) {
- // arguments include seed directory and file pattern
- LOG.info("Injecting seeds from {} by pattern {}", args[0], args[1]);
- builder.setSpout("filespout", new FileSpout(args[0], args[1], true));
- Fields key = new Fields("url");
+ if (args.length >= 2) {
+ // arguments include seed directory and file pattern
+ LOG.info("Injecting seeds from {} by pattern {}", args[0], args[1]);
+ builder.setSpout("filespout", new FileSpout(args[0], args[1], true));
+ Fields key = new Fields("url");
- builder.setBolt("filter", new URLFilterBolt()).fieldsGrouping("filespout", Constants.StatusStreamName, key);
- }
+ builder.setBolt("filter", new URLFilterBolt()).fieldsGrouping("filespout", Constants.StatusStreamName, key);
+ }
- builder.setSpout("spout", new AggregationSpout(), numShards);
+ builder.setSpout("spout", new AggregationSpout(), numShards);
- builder.setBolt("prefilter", new PreFilterBolt("pre-urlfilters.json"), numWorkers).shuffleGrouping("spout");
+ builder.setBolt("prefilter", new PreFilterBolt("pre-urlfilters.json"), numWorkers).shuffleGrouping("spout");
- builder.setBolt("partitioner", new URLPartitionerBolt(), numWorkers).shuffleGrouping("prefilter");
+ builder.setBolt("partitioner", new URLPartitionerBolt(), numWorkers).shuffleGrouping("prefilter");
- builder.setBolt("fetch", new FetcherBolt(), numWorkers).fieldsGrouping("partitioner", new Fields("key"));
+ builder.setBolt("fetch", new FetcherBolt(), numWorkers).fieldsGrouping("partitioner", new Fields("key"));
- builder.setBolt("sitemap", new NewsSiteMapParserBolt(), numWorkers).setNumTasks(2)
- .localOrShuffleGrouping("fetch");
+ builder.setBolt("sitemap", new NewsSiteMapParserBolt(), numWorkers)
+ .setNumTasks(2)
+ .localOrShuffleGrouping("fetch");
- builder.setBolt("feed", new FeedParserBolt(), numWorkers).setNumTasks(4).localOrShuffleGrouping("sitemap");
+ builder.setBolt("feed", new FeedParserBolt(), numWorkers).setNumTasks(4).localOrShuffleGrouping("sitemap");
- // don't need to parse the pages but need to update their status
- builder.setBolt("ssb", new DummyIndexer(), numWorkers).localOrShuffleGrouping("feed");
+ // don't need to parse the pages but need to update their status
+ builder.setBolt("ssb", new DummyIndexer(), numWorkers).localOrShuffleGrouping("feed");
- WARCHdfsBolt warcbolt = getWarcBolt("CC-NEWS");
+ WARCHdfsBolt warcbolt = getWarcBolt("CC-NEWS");
- // take it from feed default output so that the feed files themselves
- // don't get included - unless we want them too of course!
- builder.setBolt("warc", warcbolt, numWorkers).localOrShuffleGrouping("feed");
-
- final Fields furl = new Fields("url");
+ // take it from feed default output so that the feed files themselves
+ // don't get included - unless we want them too of course!
+ builder.setBolt("warc", warcbolt, numWorkers).localOrShuffleGrouping("feed");
- BoltDeclarer statusBolt = builder.setBolt("status", new StatusUpdaterBolt(), numWorkers)
- .fieldsGrouping("fetch", Constants.StatusStreamName, furl)
- .fieldsGrouping("sitemap", Constants.StatusStreamName, furl)
- .fieldsGrouping("feed", Constants.StatusStreamName, furl)
- .fieldsGrouping("ssb", Constants.StatusStreamName, furl)
- .fieldsGrouping("prefilter", Constants.StatusStreamName, furl);
-
- if (args.length >= 2) {
- statusBolt.customGrouping("filter", Constants.StatusStreamName, new URLStreamGrouping());
- }
- statusBolt.setNumTasks(numShards);
+ final Fields furl = new Fields("url");
- return submit(conf, builder);
+ BoltDeclarer statusBolt = builder.setBolt("status", new StatusUpdaterBolt(), numWorkers)
+ .fieldsGrouping("fetch", Constants.StatusStreamName, furl)
+ .fieldsGrouping("sitemap", Constants.StatusStreamName, furl)
+ .fieldsGrouping("feed", Constants.StatusStreamName, furl)
+ .fieldsGrouping("ssb", Constants.StatusStreamName, furl)
+ .fieldsGrouping("prefilter", Constants.StatusStreamName, furl);
+
+ if (args.length >= 2) {
+ statusBolt.customGrouping("filter", Constants.StatusStreamName, new URLStreamGrouping());
+ }
+ statusBolt.setNumTasks(numShards);
+
+ return submit(conf, builder);
}
protected WARCHdfsBolt getWarcBolt(String filePrefix) {
- // path is absolute
- String warcFilePath = ConfUtils.getString(getConf(), "warc.dir", "/data/warc");
-
- WARCFileNameFormat fileNameFormat = new WARCFileNameFormat();
- fileNameFormat.withPath(warcFilePath);
- fileNameFormat.withPrefix(filePrefix);
-
- Map
* {
* "class": "org.commoncrawl.stormcrawler.news.bootstrap.FeedLinkParseFilter",
@@ -41,12 +42,10 @@
*/
public class FeedLinkParseFilter extends LinkParseFilter {
- private static final org.slf4j.Logger LOG = LoggerFactory
- .getLogger(FeedLinkParseFilter.class);
+ private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(FeedLinkParseFilter.class);
@Override
- public void filter(String URL, byte[] content, DocumentFragment doc,
- ParseResult parse) {
+ public void filter(String URL, byte[] content, DocumentFragment doc, ParseResult parse) {
// skip existing links
logLinks(parse, URL, "Skipped links");
diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/NewsSiteMapDetectorBolt.java b/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/NewsSiteMapDetectorBolt.java
index 1160201..2e1011b 100644
--- a/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/NewsSiteMapDetectorBolt.java
+++ b/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/NewsSiteMapDetectorBolt.java
@@ -40,16 +40,14 @@
@SuppressWarnings("serial")
public class NewsSiteMapDetectorBolt extends SiteMapParserBolt {
- private static final org.slf4j.Logger LOG = LoggerFactory
- .getLogger(NewsSiteMapDetectorBolt.class);
+ private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(NewsSiteMapDetectorBolt.class);
protected static final int maxOffsetContentGuess = 1024;
- private static ContentDetector contentDetector = new ContentDetector(
- NewsSiteMapParserBolt.contentClues, maxOffsetContentGuess);
+ private static ContentDetector contentDetector = new ContentDetector(NewsSiteMapParserBolt.contentClues,
+ maxOffsetContentGuess);
private ParseFilter parseFilters;
-
@Override
public void execute(Tuple tuple) {
Metadata metadata = (Metadata) tuple.getValueByField("metadata");
@@ -57,10 +55,8 @@ public void execute(Tuple tuple) {
byte[] content = tuple.getBinaryByField("content");
String url = tuple.getStringByField("url");
- boolean isSitemap = Boolean.valueOf(
- metadata.getFirstValue(SiteMapParserBolt.isSitemapKey));
- boolean isNewsSitemap = Boolean.valueOf(
- metadata.getFirstValue(NewsSiteMapParserBolt.isSitemapNewsKey));
+ boolean isSitemap = Boolean.valueOf(metadata.getFirstValue(SiteMapParserBolt.isSitemapKey));
+ boolean isNewsSitemap = Boolean.valueOf(metadata.getFirstValue(NewsSiteMapParserBolt.isSitemapNewsKey));
if (!isNewsSitemap || !isSitemap) {
int match = contentDetector.getFirstMatch(content);
@@ -70,10 +66,8 @@ public void execute(Tuple tuple) {
metadata.setValue(SiteMapParserBolt.isSitemapKey, "true");
if (match <= NewsSiteMapParserBolt.contentCluesSitemapNewsMatchUpTo) {
isNewsSitemap = true;
- LOG.info("{} detected as news sitemap based on content",
- url);
- metadata.setValue(NewsSiteMapParserBolt.isSitemapNewsKey,
- "true");
+ LOG.info("{} detected as news sitemap based on content", url);
+ metadata.setValue(NewsSiteMapParserBolt.isSitemapNewsKey, "true");
}
}
}
@@ -85,8 +79,7 @@ public void execute(Tuple tuple) {
parseData.setMetadata(metadata);
parseFilters.filter(url, content, null, parse);
// emit status
- collector.emit(Constants.StatusStreamName, tuple,
- new Values(url, metadata, Status.FETCHED));
+ collector.emit(Constants.StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED));
} else {
// pass on
collector.emit(tuple, tuple.getValues());
@@ -95,9 +88,8 @@ public void execute(Tuple tuple) {
}
@Override
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public void prepare(Map stormConf, TopologyContext context,
- OutputCollector collect) {
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collect) {
super.prepare(stormConf, context, collect);
parseFilters = ParseFilters.fromConf(stormConf);
}
diff --git a/src/test/java/org/commoncrawl/stormcrawler/FastURLFilterTest.java b/src/test/java/org/commoncrawl/stormcrawler/FastURLFilterTest.java
index 64df0ba..aa05f36 100644
--- a/src/test/java/org/commoncrawl/stormcrawler/FastURLFilterTest.java
+++ b/src/test/java/org/commoncrawl/stormcrawler/FastURLFilterTest.java
@@ -35,53 +35,53 @@ public class FastURLFilterTest {
@BeforeClass
public static void init() {
- filter = createFilter("fast-urlfilter.txt");
+ filter = createFilter("fast-urlfilter.txt");
}
public static FastURLFilter createFilter(String fileName) {
- ObjectNode filterParams = new ObjectNode(JsonNodeFactory.instance);
- filterParams.put("file", fileName);
- FastURLFilter filter = new FastURLFilter();
- Map
* Host www.example.org
* DenyPath /path/to/be/excluded
@@ -72,48 +66,45 @@
* Domain example.org
* DenyPathQuery /resource/.*?action=exclude
*
- *
- * Host rules are evaluated before Domain rules. For
- * Host rules the entire host name of a URL must match while the
- * domain names in Domain rules are considered as matches if the
- * domain is a suffix of the host name (consisting of complete host name parts).
- * Shorter domain suffixes are checked first, a single dot
- * "." as "domain name" can be used to specify
- * global rules applied to every URL.
- *
- * E.g., for "www.example.com" the rules given above are looked up in the
- * following order:
+ *
+ * Host rules are evaluated before Domain rules. For Host
+ * rules the entire host name of a URL must match while the domain names in Domain
+ * rules are considered as matches if the domain is a suffix of the host name (consisting of
+ * complete host name parts). Shorter domain suffixes are checked first, a single dot ".
+ * " as "domain name" can be used to specify global rules applied to every
+ * URL.
+ *
+ *
- *
- * The first matching rule will reject the URL and no further rules are checked.
- * If no rule matches the URL is accepted. URLs without a host name (e.g.,
- * Domain .")Domain .")
* file:/path/file.txt are checked for global rules only. URLs
- * which fail to be parsed as {@link java.net.URL} are always rejected.
- *
- * For rules either the URL path (DenyPath) or path and query
- * (DenyPathQuery) are checked whether the given
- * {@link java.util.regex Java Regular expression} is found (see
- * {@link java.util.regex.Matcher#find()}) in the URL path (and query).
- *
- * Rules are applied in the order of their definition. For better performance,
- * regular expressions which are simpler/faster or match more URLs should be
- * defined earlier.
- *
- * Comments in the rule file start with the # character and reach
- * until the end of the line.
- *
- * The rules file is defined via the property urlfilter.fast.file,
- * the default name is fast-urlfilter.txt.
+ *
+ * The first matching rule will reject the URL and no further rules are checked. If no rule matches
+ * the URL is accepted. URLs without a host name (e.g., file:/path/file.txt are checked
+ * for global rules only. URLs which fail to be parsed as {@link java.net.URL} are always rejected.
+ *
+ * DenyPath) or path and query (DenyPathQuery
+ * ) are checked whether the given {@link java.util.regex Java Regular expression} is found
+ * (see {@link java.util.regex.Matcher#find()}) in the URL path (and query).
+ *
+ * # character and reach until the end of
+ * the line.
+ *
+ * urlfilter.fast.file, the default name
+ * is fast-urlfilter.txt.
*/
public class FastURLFilter extends URLFilter implements JSONResource {
- protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ protected static final Logger LOG =
+ LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String URLFILTER_FAST_FILE = "urlfilter.fast.file";
private Multimap
+ * @param clues nested list of literal clues. Outer list defines an OR-group, inner list
+ * contained ANDed clues required to match all, e.g. the following definition would match if
+ * either "clue1" and "and_clue2" are matched, or alternatively
+ * "or_clue3" is found
+ *
* { { clue1, and_clue2 }, { or_clue3 } }
*
*
@@ -54,11 +51,11 @@ public int getFirstMatch(byte[] content) {
if (content.length > maxOffset) {
beginning = Arrays.copyOfRange(content, 0, maxOffset);
}
- OR : for (int i = 0; i < clues.length; i++) {
+ OR:
+ for (int i = 0; i < clues.length; i++) {
byte[][] group = clues[i];
for (byte[] clue : group) {
- if (Bytes.indexOf(beginning, clue) == -1)
- continue OR;
+ if (Bytes.indexOf(beginning, clue) == -1) continue OR;
}
// success, all members of one group matched
return i;
@@ -69,5 +66,4 @@ public int getFirstMatch(byte[] content) {
public boolean matches(byte[] content) {
return (getFirstMatch(content) >= 0);
}
-
}
diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java b/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java
index 3c5d247..9368d2c 100644
--- a/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java
+++ b/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java
@@ -1,26 +1,22 @@
/**
- * Licensed to DigitalPebble Ltd under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * DigitalPebble licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to DigitalPebble Ltd under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
+ * DigitalPebble licenses this file to You under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License. You may obtain a copy of the
+ * License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ *
* {
* "class": "org.commoncrawl.stormcrawler.news.bootstrap.FeedLinkParseFilter",
@@ -59,11 +57,8 @@ public void filter(String URL, byte[] content, DocumentFragment doc, ParseResult
public static void logLinks(ParseResult parse, String URL, String message) {
if (LOG.isDebugEnabled() && parse.getOutlinks().size() > 0) {
- if (!message.isEmpty())
- LOG.debug("{} for {}:", message, URL);
- for (Outlink outlink : parse.getOutlinks())
- LOG.debug(outlink.getTargetURL());
+ if (!message.isEmpty()) LOG.debug("{} for {}:", message, URL);
+ for (Outlink outlink : parse.getOutlinks()) LOG.debug(outlink.getTargetURL());
}
}
-
}
diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/NewsSiteMapDetectorBolt.java b/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/NewsSiteMapDetectorBolt.java
index 2e1011b..f0af134 100644
--- a/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/NewsSiteMapDetectorBolt.java
+++ b/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/NewsSiteMapDetectorBolt.java
@@ -14,15 +14,10 @@
package org.commoncrawl.stormcrawler.news.bootstrap;
import java.util.Map;
-
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
-import org.commoncrawl.stormcrawler.news.ContentDetector;
-import org.commoncrawl.stormcrawler.news.NewsSiteMapParserBolt;
-import org.slf4j.LoggerFactory;
-
import org.apache.stormcrawler.Constants;
import org.apache.stormcrawler.Metadata;
import org.apache.stormcrawler.bolt.SiteMapParserBolt;
@@ -31,20 +26,23 @@
import org.apache.stormcrawler.parse.ParseFilters;
import org.apache.stormcrawler.parse.ParseResult;
import org.apache.stormcrawler.persistence.Status;
+import org.commoncrawl.stormcrawler.news.ContentDetector;
+import org.commoncrawl.stormcrawler.news.NewsSiteMapParserBolt;
+import org.slf4j.LoggerFactory;
/**
- * Detector for news
+ * Detector for news
* sitemaps and also sitemaps.
*/
@SuppressWarnings("serial")
public class NewsSiteMapDetectorBolt extends SiteMapParserBolt {
- private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(NewsSiteMapDetectorBolt.class);
+ private static final org.slf4j.Logger LOG =
+ LoggerFactory.getLogger(NewsSiteMapDetectorBolt.class);
protected static final int maxOffsetContentGuess = 1024;
- private static ContentDetector contentDetector = new ContentDetector(NewsSiteMapParserBolt.contentClues,
- maxOffsetContentGuess);
+ private static ContentDetector contentDetector =
+ new ContentDetector(NewsSiteMapParserBolt.contentClues, maxOffsetContentGuess);
private ParseFilter parseFilters;
@@ -56,7 +54,8 @@ public void execute(Tuple tuple) {
String url = tuple.getStringByField("url");
boolean isSitemap = Boolean.valueOf(metadata.getFirstValue(SiteMapParserBolt.isSitemapKey));
- boolean isNewsSitemap = Boolean.valueOf(metadata.getFirstValue(NewsSiteMapParserBolt.isSitemapNewsKey));
+ boolean isNewsSitemap =
+ Boolean.valueOf(metadata.getFirstValue(NewsSiteMapParserBolt.isSitemapNewsKey));
if (!isNewsSitemap || !isSitemap) {
int match = contentDetector.getFirstMatch(content);
@@ -79,7 +78,8 @@ public void execute(Tuple tuple) {
parseData.setMetadata(metadata);
parseFilters.filter(url, content, null, parse);
// emit status
- collector.emit(Constants.StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED));
+ collector.emit(
+ Constants.StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED));
} else {
// pass on
collector.emit(tuple, tuple.getValues());
@@ -93,5 +93,4 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll
super.prepare(stormConf, context, collect);
parseFilters = ParseFilters.fromConf(stormConf);
}
-
}
diff --git a/src/test/java/org/commoncrawl/stormcrawler/FastURLFilterTest.java b/src/test/java/org/commoncrawl/stormcrawler/FastURLFilterTest.java
index aa05f36..1931aca 100644
--- a/src/test/java/org/commoncrawl/stormcrawler/FastURLFilterTest.java
+++ b/src/test/java/org/commoncrawl/stormcrawler/FastURLFilterTest.java
@@ -14,21 +14,19 @@
*/
package org.commoncrawl.stormcrawler;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
-
+import org.apache.stormcrawler.Metadata;
+import org.apache.stormcrawler.filtering.URLFilter;
import org.commoncrawl.stormcrawler.filter.FastURLFilter;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.apache.stormcrawler.Metadata;
-import org.apache.stormcrawler.filtering.URLFilter;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
public class FastURLFilterTest {
protected static URLFilter filter;
diff --git a/src/test/java/org/commoncrawl/stormcrawler/news/NewsSiteMapParserTest.java b/src/test/java/org/commoncrawl/stormcrawler/news/NewsSiteMapParserTest.java
index db0fedd..b0a0d5a 100644
--- a/src/test/java/org/commoncrawl/stormcrawler/news/NewsSiteMapParserTest.java
+++ b/src/test/java/org/commoncrawl/stormcrawler/news/NewsSiteMapParserTest.java
@@ -16,6 +16,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
+import crawlercommons.sitemaps.UnknownFormatException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -25,17 +26,13 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
import org.apache.commons.io.IOUtils;
-import org.commoncrawl.stormcrawler.news.NewsSiteMapParserBolt.SitemapType;
-import org.junit.Before;
-import org.junit.Test;
-
import org.apache.stormcrawler.Metadata;
import org.apache.stormcrawler.parse.Outlink;
import org.apache.stormcrawler.parse.ParsingTester;
-
-import crawlercommons.sitemaps.UnknownFormatException;
+import org.commoncrawl.stormcrawler.news.NewsSiteMapParserBolt.SitemapType;
+import org.junit.Before;
+import org.junit.Test;
public class NewsSiteMapParserTest extends ParsingTester {
@@ -60,7 +57,8 @@ public void testSiteMapParser() throws IOException, UnknownFormatException {
SitemapType type = ((NewsSiteMapParserBolt) bolt).detectContent(url, content);
assertEquals(SitemapType.NEWS, type);
- ((NewsSiteMapParserBolt) bolt).parseSiteMap(url, content, contentType, parentMetadata, links);
+ ((NewsSiteMapParserBolt) bolt)
+ .parseSiteMap(url, content, contentType, parentMetadata, links);
// unmodified sitemap:
// - publication date is far in the past, link should be skipped
@@ -69,14 +67,17 @@ public void testSiteMapParser() throws IOException, UnknownFormatException {
// now set the publication date to yesterday
LocalDateTime yesterday = LocalDateTime.now().minusDays(1);
- content = (new String(content, StandardCharsets.UTF_8))
- .replace(
- "