From 4587a3cb919dcead9d1da39ade1c61cd31f73067 Mon Sep 17 00:00:00 2001 From: Luca Foppiano Date: Tue, 21 Apr 2026 14:24:41 +0100 Subject: [PATCH 01/10] add .gitignore --- .gitignore | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..37effdb --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +.idea +target +opensearchdata +warcdata +.java-version \ No newline at end of file From aadcd7cf3b5b436a3985479c1417e8b2bc98355b Mon Sep 17 00:00:00 2001 From: Luca Foppiano Date: Tue, 21 Apr 2026 14:25:20 +0100 Subject: [PATCH 02/10] docs: add JDK 17+ in the readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 4efbf08..6ac88e3 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ Crawler for news based on [StormCrawler](https://stormcrawler.apache.org/). Prod ## Prerequisites - +* JVM 17 or higher * Install OpenSearch 2.19.5 * Install Apache Storm 2.8.8 * Start OpenSearch and Storm From 3df63022b00f960d7241e955dc125077f74ee180 Mon Sep 17 00:00:00 2001 From: Luca Foppiano Date: Tue, 21 Apr 2026 14:33:36 +0100 Subject: [PATCH 03/10] chore: add spotless + rules --- eclipse-formatter.xml | 176 +++++++++++++++++++++++++++++++++++++ pom.xml | 200 +++++++++++++++++++++++------------------- 2 files changed, 288 insertions(+), 88 deletions(-) create mode 100644 eclipse-formatter.xml diff --git a/eclipse-formatter.xml b/eclipse-formatter.xml new file mode 100644 index 0000000..e9782b8 --- /dev/null +++ b/eclipse-formatter.xml @@ -0,0 +1,176 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/pom.xml b/pom.xml index 0508210..441c23c 100644 --- a/pom.xml +++ b/pom.xml @@ -1,5 +1,4 @@ - - - + 4.0.0 org.commoncrawl.stormcrawler.news crawler 3.6.0 jar + + https://github.com/commoncrawl/news-crawl Apache License, Version 2.0 @@ -35,8 +36,6 @@ under the License. - https://github.com/commoncrawl/news-crawl - UTF-8 3.6.0 @@ -49,87 +48,6 @@ under the License. 4.13.2 - - - - org.apache.maven.plugins - maven-compiler-plugin - 3.15.0 - - 17 - 17 - - - - org.codehaus.mojo - exec-maven-plugin - 3.6.3 - - - - exec - - - - - java - true - false - compile - - - - org.apache.maven.plugins - maven-shade-plugin - 3.6.2 - - - package - - shade - - - false - - - - org.apache.storm.flux.Flux - - - - - - - - - - *:* - - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - - - - - org.apache.storm:flux-core - - org/apache/commons/** - org/apache/http/** - org/yaml/** - - - - - - - - - - org.apache.stormcrawler @@ -207,4 +125,110 @@ under the License. test + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.15.0 + + 17 + 17 + + + + org.codehaus.mojo + exec-maven-plugin + 3.6.3 + + java + true + false + compile + + + + + exec + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.6.2 + + + + shade + + package + + false + + + + org.apache.storm.flux.Flux + + + + + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + org.apache.storm:flux-core + + org/apache/commons/** + org/apache/http/** + org/yaml/** + + + + + + + + + com.diffplug.spotless + spotless-maven-plugin + 2.46.1 + + + + + pom.xml + + + all + true + false + -1 + recommended_2008_06 + + + + + ${project.basedir}/eclipse-formatter.xml + + + + + + From 9be656e4ebcbec606c9b2d929655a830fce2ceac Mon Sep 17 00:00:00 2001 From: Luca Foppiano Date: Tue, 21 Apr 2026 14:33:47 +0100 Subject: [PATCH 04/10] chore: reformat code --- .../stormcrawler/filter/FastURLFilter.java | 498 +++++++++--------- .../stormcrawler/news/ContentDetector.java | 23 +- .../stormcrawler/news/CrawlTopology.java | 162 +++--- .../stormcrawler/news/FeedDetectorBolt.java | 25 +- .../news/NewsSiteMapParserBolt.java | 178 +++---- .../stormcrawler/news/PreFilterBolt.java | 114 ++-- .../news/PunycodeURLNormalizer.java | 6 +- .../news/bootstrap/BootstrapTopology.java | 39 +- .../news/bootstrap/FeedLinkParseFilter.java | 7 +- .../bootstrap/NewsSiteMapDetectorBolt.java | 28 +- .../stormcrawler/FastURLFilterTest.java | 72 +-- .../news/NewsSiteMapParserTest.java | 93 ++-- 12 files changed, 609 insertions(+), 636 deletions(-) diff --git a/src/main/java/org/commoncrawl/stormcrawler/filter/FastURLFilter.java b/src/main/java/org/commoncrawl/stormcrawler/filter/FastURLFilter.java index 7c52716..e0b8a7b 100644 --- a/src/main/java/org/commoncrawl/stormcrawler/filter/FastURLFilter.java +++ b/src/main/java/org/commoncrawl/stormcrawler/filter/FastURLFilter.java @@ -111,7 +111,7 @@ * The rules file is defined via the property urlfilter.fast.file, * the default name is fast-urlfilter.txt. */ -public class FastURLFilter extends URLFilter implements JSONResource { +public class FastURLFilter extends URLFilter implements JSONResource { protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -127,41 +127,41 @@ public class FastURLFilter extends URLFilter implements JSONResource { public void configure(@SuppressWarnings("rawtypes") Map stormConf, JsonNode filterParams) { - // read from conf first - int refreshRate = ConfUtils.getInt(stormConf, "fast.urlfilter.refresh", -1); - this.resourceFile = ConfUtils.getString(stormConf, "fast.urlfilter.file", null); - - // then from the param file (which needs recompiling in case of change) - if (filterParams != null) { - JsonNode node = filterParams.get("file"); - if (node != null && node.isTextual() && this.resourceFile == null) { - this.resourceFile = node.asText(); - } - node = filterParams.get("refresh"); - if (node != null && node.isInt() && refreshRate == -1) { - refreshRate = node.asInt(); - } - } - - try { - loadJSONResources(); - } catch (Exception e) { - LOG.error("Exception while loading resources", e); - } - - if (refreshRate != -1) { - LOG.info("Filter set to reload from {} every {} sec", getResourceFile(), refreshRate); - new Timer().schedule(new TimerTask() { - public void run() { - LOG.info("Reloading resources"); - try { - loadJSONResources(); - } catch (Exception e) { - LOG.error("Can't load resources", e); - } - } - }, refreshRate * 1000, refreshRate * 1000); - } + // read from conf first + int refreshRate = ConfUtils.getInt(stormConf, "fast.urlfilter.refresh", -1); + this.resourceFile = ConfUtils.getString(stormConf, "fast.urlfilter.file", null); + + // then from the param file (which needs recompiling in case of change) + if (filterParams != null) { + JsonNode node = filterParams.get("file"); + if (node != null && node.isTextual() && this.resourceFile == null) { + this.resourceFile = node.asText(); + } + node = filterParams.get("refresh"); + if (node != null && node.isInt() && refreshRate == -1) { + refreshRate = node.asInt(); + } + } + + try { + loadJSONResources(); + } catch (Exception e) { + LOG.error("Exception while loading resources", e); + } + + if (refreshRate != -1) { + LOG.info("Filter set to reload from {} every {} sec", getResourceFile(), refreshRate); + new Timer().schedule(new TimerTask() { + public void run() { + LOG.info("Reloading resources"); + try { + loadJSONResources(); + } catch (Exception e) { + LOG.error("Can't load resources", e); + } + } + }, refreshRate * 1000, refreshRate * 1000); + } } /** @@ -171,252 +171,256 @@ public void run() { **/ @Override public void loadJSONResources() throws Exception { - InputStream inputStream = null; - AmazonS3 s3client = null; - try { - if (getResourceFile().startsWith("s3://")) { - // try loading from S3 - s3client = AmazonS3ClientBuilder.standard().build(); - java.net.URI uri = new java.net.URI(getResourceFile()); - - String bucketName = uri.getHost(); - // remove the first "/" - String path = uri.getPath().substring(1); - - // optimisation - avoid a full reload if the resource has not changed - ObjectMetadata metadata = s3client.getObjectMetadata(bucketName, path); - final String ETAG = metadata.getETag(); - if (ETAG != null && ETAG.equals(resourceETAG)) { - LOG.info("Unchanged ETAG for {} - skipping reload", getResourceFile()); - return; - } else { - resourceETAG = ETAG; - } - - final S3Object object = s3client.getObject(new GetObjectRequest(bucketName, path)); - inputStream = object.getObjectContent(); - } else { - inputStream = getClass().getClassLoader().getResourceAsStream(getResourceFile()); - if (inputStream == null) { - LOG.error("Can't load conf from {}", getResourceFile()); - return; - } - } - if (getResourceFile().endsWith(".gz")) { - inputStream = new GZIPInputStream(inputStream); - } - - loadJSONResources(new BufferedInputStream(inputStream)); - } finally { - if (inputStream != null) { - inputStream.close(); - } - if (s3client != null) { - s3client.shutdown(); - } - } + InputStream inputStream = null; + AmazonS3 s3client = null; + try { + if (getResourceFile().startsWith("s3://")) { + // try loading from S3 + s3client = AmazonS3ClientBuilder.standard().build(); + java.net.URI uri = new java.net.URI(getResourceFile()); + + String bucketName = uri.getHost(); + // remove the first "/" + String path = uri.getPath().substring(1); + + // optimisation - avoid a full reload if the resource has not changed + ObjectMetadata metadata = s3client.getObjectMetadata(bucketName, path); + final String ETAG = metadata.getETag(); + if (ETAG != null && ETAG.equals(resourceETAG)) { + LOG.info("Unchanged ETAG for {} - skipping reload", getResourceFile()); + return; + } else { + resourceETAG = ETAG; + } + + final S3Object object = s3client.getObject(new GetObjectRequest(bucketName, path)); + inputStream = object.getObjectContent(); + } else { + inputStream = getClass().getClassLoader().getResourceAsStream(getResourceFile()); + if (inputStream == null) { + LOG.error("Can't load conf from {}", getResourceFile()); + return; + } + } + if (getResourceFile().endsWith(".gz")) { + inputStream = new GZIPInputStream(inputStream); + } + + loadJSONResources(new BufferedInputStream(inputStream)); + } finally { + if (inputStream != null) { + inputStream.close(); + } + if (s3client != null) { + s3client.shutdown(); + } + } } @Override public void loadJSONResources(InputStream inputStream) - throws JsonParseException, JsonMappingException, IOException { - long start = System.currentTimeMillis(); - - try (Reader r = new InputStreamReader(inputStream)) { - reloadRules(r); - } - - long end = System.currentTimeMillis(); - LOG.info("Loaded {} hostrules and {} domain rules in {} msec from {}", hostRules.size(), domainRules.size(), - (end - start), resourceFile); + throws JsonParseException, JsonMappingException, IOException { + long start = System.currentTimeMillis(); + + try (Reader r = new InputStreamReader(inputStream)) { + reloadRules(r); + } + + long end = System.currentTimeMillis(); + LOG.info( + "Loaded {} hostrules and {} domain rules in {} msec from {}", + hostRules.size(), + domainRules.size(), + (end - start), + resourceFile); } @Override public String getResourceFile() { - return resourceFile; + return resourceFile; } @Override public String filter(URL sourceUrl, Metadata sourceMetadata, String urlToFilter) { - synchronized (this) { - URL u; - - try { - u = new URL(urlToFilter); - } catch (Exception e) { - LOG.debug("Rejected {} because failed to parse as URL: {}", urlToFilter, e.getMessage()); - return null; - } - - String hostname = u.getHost(); - - // first check for host-specific rules - for (Rule rule : hostRules.get(hostname)) { - if (rule.match(u)) { - return null; - } - } - - // also look up domain rules for host name - for (Rule rule : domainRules.get(hostname)) { - if (rule.match(u)) { - return null; - } - } - - // check suffixes of host name from longer to shorter: - // subdomains, domain, top-level domain - int start = 0; - int pos; - while ((pos = hostname.indexOf('.', start)) != -1) { - start = pos + 1; - String domain = hostname.substring(start); - for (Rule rule : domainRules.get(domain)) { - if (rule.match(u)) { - return null; - } - } - } - - // finally check "global" rules defined for `Domain .` - for (Rule rule : domainRules.get(".")) { - if (rule.match(u)) { - return null; - } - } - - // no reject rules found - return urlToFilter; - } + synchronized (this) { + URL u; + + try { + u = new URL(urlToFilter); + } catch (Exception e) { + LOG.debug("Rejected {} because failed to parse as URL: {}", urlToFilter, e.getMessage()); + return null; + } + + String hostname = u.getHost(); + + // first check for host-specific rules + for (Rule rule : hostRules.get(hostname)) { + if (rule.match(u)) { + return null; + } + } + + // also look up domain rules for host name + for (Rule rule : domainRules.get(hostname)) { + if (rule.match(u)) { + return null; + } + } + + // check suffixes of host name from longer to shorter: + // subdomains, domain, top-level domain + int start = 0; + int pos; + while ((pos = hostname.indexOf('.', start)) != -1) { + start = pos + 1; + String domain = hostname.substring(start); + for (Rule rule : domainRules.get(domain)) { + if (rule.match(u)) { + return null; + } + } + } + + // finally check "global" rules defined for `Domain .` + for (Rule rule : domainRules.get(".")) { + if (rule.match(u)) { + return null; + } + } + + // no reject rules found + return urlToFilter; + } } private void reloadRules(Reader rules) throws IOException { - synchronized (this) { - domainRules.clear(); - hostRules.clear(); - - BufferedReader reader = new BufferedReader(rules); - - String current = null; - boolean host = false; - int lineno = 0; - - String line; - try { - while ((line = reader.readLine()) != null) { - lineno++; - line = line.trim(); - - if (line.indexOf("#") != -1) { - // strip comments - line = line.substring(0, line.indexOf("#")).trim(); - } - - if (StringUtils.isBlank(line)) { - continue; - } - - if (line.startsWith("Host")) { - host = true; - current = line.split("\\s+")[1]; - } else if (line.startsWith("Domain")) { - host = false; - current = line.split("\\s+")[1]; - } else { - if (current == null) { - continue; - } - - Rule rule = null; - try { - if (CATCH_ALL_RULE.matcher(line).matches()) { - rule = DenyAllRule.getInstance(); - } else if (line.startsWith("DenyPathQuery")) { - rule = new DenyPathQueryRule(line.split("\\s+")[1]); - } else if (line.startsWith("DenyPath")) { - rule = new DenyPathRule(line.split("\\s+")[1]); - } else { - LOG.warn("Problem reading rule on line {}: {}", lineno, line); - continue; - } - } catch (Exception e) { - LOG.warn("Problem reading rule on line {}: {} - {}", lineno, line, e.getMessage()); - continue; - } - - if (host) { - LOG.trace("Adding host rule [{}] [{}]", current, rule); - hostRules.put(current, rule); - } else { - LOG.trace("Adding domain rule [{}] [{}]", current, rule); - domainRules.put(current, rule); - } - } - } - - } catch (IOException e) { - LOG.warn("Caught exception while reading rules file at line {}: {}", lineno, e.getMessage()); - throw e; - } - } + synchronized (this) { + domainRules.clear(); + hostRules.clear(); + + BufferedReader reader = new BufferedReader(rules); + + String current = null; + boolean host = false; + int lineno = 0; + + String line; + try { + while ((line = reader.readLine()) != null) { + lineno++; + line = line.trim(); + + if (line.indexOf("#") != -1) { + // strip comments + line = line.substring(0, line.indexOf("#")).trim(); + } + + if (StringUtils.isBlank(line)) { + continue; + } + + if (line.startsWith("Host")) { + host = true; + current = line.split("\\s+")[1]; + } else if (line.startsWith("Domain")) { + host = false; + current = line.split("\\s+")[1]; + } else { + if (current == null) { + continue; + } + + Rule rule = null; + try { + if (CATCH_ALL_RULE.matcher(line).matches()) { + rule = DenyAllRule.getInstance(); + } else if (line.startsWith("DenyPathQuery")) { + rule = new DenyPathQueryRule(line.split("\\s+")[1]); + } else if (line.startsWith("DenyPath")) { + rule = new DenyPathRule(line.split("\\s+")[1]); + } else { + LOG.warn("Problem reading rule on line {}: {}", lineno, line); + continue; + } + } catch (Exception e) { + LOG.warn("Problem reading rule on line {}: {} - {}", lineno, line, e.getMessage()); + continue; + } + + if (host) { + LOG.trace("Adding host rule [{}] [{}]", current, rule); + hostRules.put(current, rule); + } else { + LOG.trace("Adding domain rule [{}] [{}]", current, rule); + domainRules.put(current, rule); + } + } + } + + } catch (IOException e) { + LOG.warn("Caught exception while reading rules file at line {}: {}", lineno, e.getMessage()); + throw e; + } + } } public static class Rule { - protected Pattern pattern; + protected Pattern pattern; - Rule() { - } + Rule() { + } - public Rule(String regex) { - pattern = Pattern.compile(regex); - } + public Rule(String regex) { + pattern = Pattern.compile(regex); + } - public boolean match(URL url) { - return pattern.matcher(url.toString()).find(); - } + public boolean match(URL url) { + return pattern.matcher(url.toString()).find(); + } - public String toString() { - return pattern.toString(); - } + public String toString() { + return pattern.toString(); + } } public static class DenyPathRule extends Rule { - public DenyPathRule(String regex) { - super(regex); - } - - public boolean match(URL url) { - String haystack = url.getPath(); - return pattern.matcher(haystack).find(); - } + public DenyPathRule(String regex) { + super(regex); + } + + public boolean match(URL url) { + String haystack = url.getPath(); + return pattern.matcher(haystack).find(); + } } /** Rule for DenyPath .* or DenyPath .? */ public static class DenyAllRule extends Rule { - private static Rule instance = new DenyAllRule("."); + private static Rule instance = new DenyAllRule("."); - private DenyAllRule(String regex) { - super(regex); - } + private DenyAllRule(String regex) { + super(regex); + } - public static Rule getInstance() { - return instance; - } + public static Rule getInstance() { + return instance; + } - public boolean match(URL url) { - return true; - } + public boolean match(URL url) { + return true; + } } public static class DenyPathQueryRule extends Rule { - public DenyPathQueryRule(String regex) { - super(regex); - } - - public boolean match(URL url) { - String haystack = url.getFile(); - return pattern.matcher(haystack).find(); - } + public DenyPathQueryRule(String regex) { + super(regex); + } + + public boolean match(URL url) { + String haystack = url.getFile(); + return pattern.matcher(haystack).find(); + } } } diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/ContentDetector.java b/src/main/java/org/commoncrawl/stormcrawler/news/ContentDetector.java index 1c3a4a5..7305195 100644 --- a/src/main/java/org/commoncrawl/stormcrawler/news/ContentDetector.java +++ b/src/main/java/org/commoncrawl/stormcrawler/news/ContentDetector.java @@ -27,19 +27,17 @@ public class ContentDetector { * Set up detector to detect content sniffing for a set of clue strings in a * prefix of the binary content. * - * @param clues - * nested list of literal clues. Outer list defines an OR-group, - * inner list contained ANDed clues required to match all, e.g. - * the following definition would match if either - * "clue1" and "and_clue2" are matched, or - * alternatively "or_clue3" is found + * @param clues nested list of literal clues. Outer list defines an + * OR-group, inner list contained ANDed clues required to match + * all, e.g. the following definition would match if either + * "clue1" and "and_clue2" are matched, or + * alternatively "or_clue3" is found * - *
-     *            { { clue1, and_clue2 }, { or_clue3 } }
-     *            
+ *
+     *                  { { clue1, and_clue2 }, { or_clue3 } }
+     *                  
* - * @param maxOffset - * max. offset of content prefix checked for clues + * @param maxOffset max. offset of content prefix checked for clues */ public ContentDetector(String[][] clues, int maxOffset) { this.maxOffset = maxOffset; @@ -56,8 +54,7 @@ public int getFirstMatch(byte[] content) { if (content.length > maxOffset) { beginning = Arrays.copyOfRange(content, 0, maxOffset); } - OR: - for (int i = 0; i < clues.length; i++) { + OR : for (int i = 0; i < clues.length; i++) { byte[][] group = clues[i]; for (byte[] clue : group) { if (Bytes.indexOf(beginning, clue) == -1) diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java b/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java index 3a3c018..3c5d247 100644 --- a/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java +++ b/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java @@ -51,108 +51,110 @@ public class CrawlTopology extends ConfigurableTopology { private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(CrawlTopology.class); public static void main(String[] args) throws Exception { - ConfigurableTopology.start(new CrawlTopology(), args); + ConfigurableTopology.start(new CrawlTopology(), args); } @Override protected int run(String[] args) { - TopologyBuilder builder = new TopologyBuilder(); + TopologyBuilder builder = new TopologyBuilder(); - int numWorkers = ConfUtils.getInt(getConf(), "topology.workers", 1); + int numWorkers = ConfUtils.getInt(getConf(), "topology.workers", 1); - // set to the real number of shards ONLY if es.status.routing is set to - // true in the configuration - int numShards = 16; + // set to the real number of shards ONLY if es.status.routing is set to + // true in the configuration + int numShards = 16; - if (args.length >= 2) { - // arguments include seed directory and file pattern - LOG.info("Injecting seeds from {} by pattern {}", args[0], args[1]); - builder.setSpout("filespout", new FileSpout(args[0], args[1], true)); - Fields key = new Fields("url"); + if (args.length >= 2) { + // arguments include seed directory and file pattern + LOG.info("Injecting seeds from {} by pattern {}", args[0], args[1]); + builder.setSpout("filespout", new FileSpout(args[0], args[1], true)); + Fields key = new Fields("url"); - builder.setBolt("filter", new URLFilterBolt()).fieldsGrouping("filespout", Constants.StatusStreamName, key); - } + builder.setBolt("filter", new URLFilterBolt()).fieldsGrouping("filespout", Constants.StatusStreamName, key); + } - builder.setSpout("spout", new AggregationSpout(), numShards); + builder.setSpout("spout", new AggregationSpout(), numShards); - builder.setBolt("prefilter", new PreFilterBolt("pre-urlfilters.json"), numWorkers).shuffleGrouping("spout"); + builder.setBolt("prefilter", new PreFilterBolt("pre-urlfilters.json"), numWorkers).shuffleGrouping("spout"); - builder.setBolt("partitioner", new URLPartitionerBolt(), numWorkers).shuffleGrouping("prefilter"); + builder.setBolt("partitioner", new URLPartitionerBolt(), numWorkers).shuffleGrouping("prefilter"); - builder.setBolt("fetch", new FetcherBolt(), numWorkers).fieldsGrouping("partitioner", new Fields("key")); + builder.setBolt("fetch", new FetcherBolt(), numWorkers).fieldsGrouping("partitioner", new Fields("key")); - builder.setBolt("sitemap", new NewsSiteMapParserBolt(), numWorkers).setNumTasks(2) - .localOrShuffleGrouping("fetch"); + builder.setBolt("sitemap", new NewsSiteMapParserBolt(), numWorkers) + .setNumTasks(2) + .localOrShuffleGrouping("fetch"); - builder.setBolt("feed", new FeedParserBolt(), numWorkers).setNumTasks(4).localOrShuffleGrouping("sitemap"); + builder.setBolt("feed", new FeedParserBolt(), numWorkers).setNumTasks(4).localOrShuffleGrouping("sitemap"); - // don't need to parse the pages but need to update their status - builder.setBolt("ssb", new DummyIndexer(), numWorkers).localOrShuffleGrouping("feed"); + // don't need to parse the pages but need to update their status + builder.setBolt("ssb", new DummyIndexer(), numWorkers).localOrShuffleGrouping("feed"); - WARCHdfsBolt warcbolt = getWarcBolt("CC-NEWS"); + WARCHdfsBolt warcbolt = getWarcBolt("CC-NEWS"); - // take it from feed default output so that the feed files themselves - // don't get included - unless we want them too of course! - builder.setBolt("warc", warcbolt, numWorkers).localOrShuffleGrouping("feed"); - - final Fields furl = new Fields("url"); + // take it from feed default output so that the feed files themselves + // don't get included - unless we want them too of course! + builder.setBolt("warc", warcbolt, numWorkers).localOrShuffleGrouping("feed"); - BoltDeclarer statusBolt = builder.setBolt("status", new StatusUpdaterBolt(), numWorkers) - .fieldsGrouping("fetch", Constants.StatusStreamName, furl) - .fieldsGrouping("sitemap", Constants.StatusStreamName, furl) - .fieldsGrouping("feed", Constants.StatusStreamName, furl) - .fieldsGrouping("ssb", Constants.StatusStreamName, furl) - .fieldsGrouping("prefilter", Constants.StatusStreamName, furl); - - if (args.length >= 2) { - statusBolt.customGrouping("filter", Constants.StatusStreamName, new URLStreamGrouping()); - } - statusBolt.setNumTasks(numShards); + final Fields furl = new Fields("url"); - return submit(conf, builder); + BoltDeclarer statusBolt = builder.setBolt("status", new StatusUpdaterBolt(), numWorkers) + .fieldsGrouping("fetch", Constants.StatusStreamName, furl) + .fieldsGrouping("sitemap", Constants.StatusStreamName, furl) + .fieldsGrouping("feed", Constants.StatusStreamName, furl) + .fieldsGrouping("ssb", Constants.StatusStreamName, furl) + .fieldsGrouping("prefilter", Constants.StatusStreamName, furl); + + if (args.length >= 2) { + statusBolt.customGrouping("filter", Constants.StatusStreamName, new URLStreamGrouping()); + } + statusBolt.setNumTasks(numShards); + + return submit(conf, builder); } protected WARCHdfsBolt getWarcBolt(String filePrefix) { - // path is absolute - String warcFilePath = ConfUtils.getString(getConf(), "warc.dir", "/data/warc"); - - WARCFileNameFormat fileNameFormat = new WARCFileNameFormat(); - fileNameFormat.withPath(warcFilePath); - fileNameFormat.withPrefix(filePrefix); - - Map fields = new LinkedHashMap<>(); - fields.put("software", "StormCrawler 2.10 https://stormcrawler.net/"); - fields.put("description", "News crawl for Common Crawl"); - String userAgent = AbstractHttpProtocol.getAgentString(getConf()); - fields.put("http-header-user-agent", userAgent); - fields.put("http-header-from", ConfUtils.getString(getConf(), "http.agent.email")); - String robotsTxtParser = "checked by crawler-commons " + crawlercommons.CrawlerCommons.getVersion() - + " (https://github.com/crawler-commons/crawler-commons)"; - fields.put("robots", robotsTxtParser); - fields.put("format", "WARC File Format 1.1"); - fields.put("conformsTo", "https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.1/"); - - WARCHdfsBolt warcbolt = (WARCHdfsBolt) new WARCHdfsBolt(); - warcbolt.withConfigKey("warc"); - warcbolt.withFileNameFormat(fileNameFormat); - warcbolt.withHeader(fields); - warcbolt.withRequestRecords(); - - // use RawLocalFileSystem (instead of ChecksumFileSystem) to avoid that - // WARC files are truncated if the topology is stopped because of a - // delayed sync of the default ChecksumFileSystem - Map hdfsConf = new HashMap<>(); - hdfsConf.put("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem"); - getConf().put("warc", hdfsConf); - - // will rotate if reaches size or time limit - int maxMB = ConfUtils.getInt(getConf(), "warc.rotation.policy.max-mb", 1024); - int maxMinutes = ConfUtils.getInt(getConf(), "warc.rotation.policy.max-minutes", 1440); - FileTimeSizeRotationPolicy rotpol = new FileTimeSizeRotationPolicy(maxMB, Units.MB); - rotpol.setTimeRotationInterval(maxMinutes, FileTimeSizeRotationPolicy.TimeUnit.MINUTES); - warcbolt.withRotationPolicy(rotpol); - - return warcbolt; + // path is absolute + String warcFilePath = ConfUtils.getString(getConf(), "warc.dir", "/data/warc"); + + WARCFileNameFormat fileNameFormat = new WARCFileNameFormat(); + fileNameFormat.withPath(warcFilePath); + fileNameFormat.withPrefix(filePrefix); + + Map fields = new LinkedHashMap<>(); + fields.put("software", "StormCrawler 2.10 https://stormcrawler.net/"); + fields.put("description", "News crawl for Common Crawl"); + String userAgent = AbstractHttpProtocol.getAgentString(getConf()); + fields.put("http-header-user-agent", userAgent); + fields.put("http-header-from", ConfUtils.getString(getConf(), "http.agent.email")); + String robotsTxtParser = "checked by crawler-commons " + + crawlercommons.CrawlerCommons.getVersion() + + " (https://github.com/crawler-commons/crawler-commons)"; + fields.put("robots", robotsTxtParser); + fields.put("format", "WARC File Format 1.1"); + fields.put("conformsTo", "https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.1/"); + + WARCHdfsBolt warcbolt = (WARCHdfsBolt) new WARCHdfsBolt(); + warcbolt.withConfigKey("warc"); + warcbolt.withFileNameFormat(fileNameFormat); + warcbolt.withHeader(fields); + warcbolt.withRequestRecords(); + + // use RawLocalFileSystem (instead of ChecksumFileSystem) to avoid that + // WARC files are truncated if the topology is stopped because of a + // delayed sync of the default ChecksumFileSystem + Map hdfsConf = new HashMap<>(); + hdfsConf.put("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem"); + getConf().put("warc", hdfsConf); + + // will rotate if reaches size or time limit + int maxMB = ConfUtils.getInt(getConf(), "warc.rotation.policy.max-mb", 1024); + int maxMinutes = ConfUtils.getInt(getConf(), "warc.rotation.policy.max-minutes", 1440); + FileTimeSizeRotationPolicy rotpol = new FileTimeSizeRotationPolicy(maxMB, Units.MB); + rotpol.setTimeRotationInterval(maxMinutes, FileTimeSizeRotationPolicy.TimeUnit.MINUTES); + warcbolt.withRotationPolicy(rotpol); + + return warcbolt; } } diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/FeedDetectorBolt.java b/src/main/java/org/commoncrawl/stormcrawler/news/FeedDetectorBolt.java index c365525..6fed661 100644 --- a/src/main/java/org/commoncrawl/stormcrawler/news/FeedDetectorBolt.java +++ b/src/main/java/org/commoncrawl/stormcrawler/news/FeedDetectorBolt.java @@ -35,22 +35,16 @@ @SuppressWarnings("serial") public class FeedDetectorBolt extends FeedParserBolt { - private static final org.slf4j.Logger LOG = LoggerFactory - .getLogger(FeedDetectorBolt.class); + private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(FeedDetectorBolt.class); - public static final String[] mimeTypeClues = { - "rss+xml", "atom+xml", "text/rss" - }; + public static final String[] mimeTypeClues = {"rss+xml", "atom+xml", "text/rss"}; - public static String[][] contentClues = { { "<{}> for {}", - ct, url); + LOG.info("Feed detected from content type <{}> for {}", ct, url); break; } } @@ -90,8 +83,7 @@ public void execute(Tuple tuple) { parseData.setMetadata(metadata); parseFilters.filter(url, content, null, parse); // emit status - collector.emit(Constants.StatusStreamName, tuple, - new Values(url, metadata, Status.FETCHED)); + collector.emit(Constants.StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED)); } else { // pass on collector.emit(tuple, tuple.getValues()); @@ -100,9 +92,8 @@ public void execute(Tuple tuple) { } @Override - @SuppressWarnings({ "rawtypes" }) - public void prepare(Map stormConf, TopologyContext context, - OutputCollector collect) { + @SuppressWarnings({"rawtypes"}) + public void prepare(Map stormConf, TopologyContext context, OutputCollector collect) { super.prepare(stormConf, context, collect); parseFilters = ParseFilters.fromConf(stormConf); } diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/NewsSiteMapParserBolt.java b/src/main/java/org/commoncrawl/stormcrawler/news/NewsSiteMapParserBolt.java index 3c4cf55..a7974fe 100644 --- a/src/main/java/org/commoncrawl/stormcrawler/news/NewsSiteMapParserBolt.java +++ b/src/main/java/org/commoncrawl/stormcrawler/news/NewsSiteMapParserBolt.java @@ -59,7 +59,6 @@ import crawlercommons.sitemaps.extension.LinkAttributes; import crawlercommons.sitemaps.extension.NewsAttributes; - /** * ParserBolt for news @@ -68,13 +67,13 @@ @SuppressWarnings("serial") public class NewsSiteMapParserBolt extends SiteMapParserBolt { // TODO: - // this is a modified copy of c.d.s.bolt.SiteMapParserBolt - // - make parent class extensible and overridable - // modifications: - // - detect and process only Google news sitemaps - // - or a sitemapindex because some subsitemaps may - // be news sitemaps - // - pass "isSitemapNews" to status metadata + // this is a modified copy of c.d.s.bolt.SiteMapParserBolt + // - make parent class extensible and overridable + // modifications: + // - detect and process only Google news sitemaps + // - or a sitemapindex because some subsitemaps may + // be news sitemaps + // - pass "isSitemapNews" to status metadata public static enum SitemapType { NEWS, INDEX, SITEMAP @@ -83,14 +82,13 @@ public static enum SitemapType { public static final String isSitemapNewsKey = "isSitemapNews"; public static final String isSitemapIndexKey = "isSitemapIndex"; /** - * A sitemap (not necessarily a news sitemap) which is verified to contain - * links to news articles. Necessary to crawl news sites which provide a - * sitemap but neither a news feed or sitemap. + * A sitemap (not necessarily a news sitemap) which is verified to contain links + * to news articles. Necessary to crawl news sites which provide a sitemap but + * neither a news feed or sitemap. */ public static final String isSitemapVerifiedKey = "isSitemapVerified"; - private static final org.slf4j.Logger LOG = LoggerFactory - .getLogger(NewsSiteMapParserBolt.class); + private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(NewsSiteMapParserBolt.class); /* content clues for news sitemaps, sitemap indexes or any sitemaps */ public static String[][] contentClues; @@ -140,14 +138,10 @@ public void execute(Tuple tuple) { byte[] content = tuple.getBinaryByField("content"); String url = tuple.getStringByField("url"); - boolean isSitemap = Boolean.valueOf( - metadata.getFirstValue(SiteMapParserBolt.isSitemapKey)); - boolean isNewsSitemap = Boolean - .valueOf(metadata.getFirstValue(isSitemapNewsKey)); - boolean isSitemapIndex = Boolean - .valueOf(metadata.getFirstValue(isSitemapIndexKey)); - boolean isSitemapVerified = Boolean - .valueOf(metadata.getFirstValue(isSitemapVerifiedKey)); + boolean isSitemap = Boolean.valueOf(metadata.getFirstValue(SiteMapParserBolt.isSitemapKey)); + boolean isNewsSitemap = Boolean.valueOf(metadata.getFirstValue(isSitemapNewsKey)); + boolean isSitemapIndex = Boolean.valueOf(metadata.getFirstValue(isSitemapIndexKey)); + boolean isSitemapVerified = Boolean.valueOf(metadata.getFirstValue(isSitemapVerifiedKey)); if (sniffContent) { SitemapType type = detectContent(url, content); @@ -183,15 +177,14 @@ public void execute(Tuple tuple) { if (isNewsSitemap || isSitemapIndex || isSitemapVerified) { /* - * remove the isSitemap key from metadata to avoid that the default - * sitemap fetch interval is applied to news sitemaps, sitemap - * indexes and verified sitemaps + * remove the isSitemap key from metadata to avoid that the default sitemap + * fetch interval is applied to news sitemaps, sitemap indexes and verified + * sitemaps */ metadata.remove(isSitemapKey); } else { if (isSitemap) { - collector.emit(Constants.StatusStreamName, tuple, - new Values(url, metadata, Status.FETCHED)); + collector.emit(Constants.StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED)); } else { // not a sitemap, just pass it on collector.emit(tuple, tuple.getValues()); @@ -217,8 +210,7 @@ public void execute(Tuple tuple) { metadata.setValue(Constants.STATUS_ERROR_SOURCE, "sitemap parsing"); metadata.setValue(Constants.STATUS_ERROR_MESSAGE, errorMessage); metadata.remove("numLinks"); - collector.emit(Constants.StatusStreamName, tuple, new Values(url, - metadata, Status.ERROR)); + collector.emit(Constants.StatusStreamName, tuple, new Values(url, metadata, Status.ERROR)); collector.ack(tuple); return; } @@ -232,15 +224,12 @@ public void execute(Tuple tuple) { parseFilters.filter(url, content, null, parse); } catch (RuntimeException e) { - String errorMessage = "Exception while running parse filters on " - + url + ": " + e; + String errorMessage = "Exception while running parse filters on " + url + ": " + e; LOG.error(errorMessage); - metadata.setValue(Constants.STATUS_ERROR_SOURCE, - "content filtering"); + metadata.setValue(Constants.STATUS_ERROR_SOURCE, "content filtering"); metadata.setValue(Constants.STATUS_ERROR_MESSAGE, errorMessage); metadata.remove("numLinks"); - collector.emit(StatusStreamName, tuple, new Values(url, metadata, - Status.ERROR)); + collector.emit(StatusStreamName, tuple, new Values(url, metadata, Status.ERROR)); collector.ack(tuple); return; } @@ -263,8 +252,7 @@ public void execute(Tuple tuple) { ol.getMetadata().setValue(isSitemapVerifiedKey, "true"); } } - Values v = new Values(ol.getTargetURL(), ol.getMetadata(), - Status.DISCOVERED); + Values v = new Values(ol.getTargetURL(), ol.getMetadata(), Status.DISCOVERED); collector.emit(Constants.StatusStreamName, tuple, v); } @@ -272,8 +260,7 @@ public void execute(Tuple tuple) { metadata.setValue("numLinks", String.valueOf(outlinks.size())); // marking the main URL as successfully fetched - collector.emit(Constants.StatusStreamName, tuple, new Values(url, - metadata, Status.FETCHED)); + collector.emit(Constants.StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED)); collector.ack(tuple); } @@ -291,12 +278,10 @@ public SitemapType detectContent(String url, byte[] content) { if (match >= 0) { // a sitemap, need to detect type of sitemap if (match <= contentCluesSitemapNewsMatchUpTo) { - LOG.info("{} detected as news sitemap based on content", - url); + LOG.info("{} detected as news sitemap based on content", url); return SitemapType.NEWS; } else if (match <= contentCluesSitemapIndexMatchUpTo) { - LOG.info("{} detected as sitemap index based on content", - url); + LOG.info("{} detected as sitemap index based on content", url); return SitemapType.INDEX; } else { return SitemapType.SITEMAP; @@ -317,12 +302,14 @@ private boolean recentlyModified(Date lastModified) { return true; } - protected AbstractSiteMap parseSiteMap(String url, byte[] content, - String contentType, Metadata parentMetadata, List links) - throws UnknownFormatException, IOException { + protected AbstractSiteMap parseSiteMap( + String url, + byte[] content, + String contentType, + Metadata parentMetadata, + List links) throws UnknownFormatException, IOException { - SiteMapParser parser = new SiteMapParser(strictModeSitemaps, - allowPartialSitemaps); + SiteMapParser parser = new SiteMapParser(strictModeSitemaps, allowPartialSitemaps); parser.setStrictNamespace(true); parser.addAcceptedNamespace(Namespace.SITEMAP_LEGACY); parser.addAcceptedNamespace(Namespace.EMPTY); @@ -334,8 +321,7 @@ protected AbstractSiteMap parseSiteMap(String url, byte[] content, long start = System.currentTimeMillis(); AbstractSiteMap siteMap; // let the parser guess what the mimetype is - if (StringUtils.isBlank(contentType) - || contentType.contains("octet-stream")) { + if (StringUtils.isBlank(contentType) || contentType.contains("octet-stream")) { siteMap = parser.parseSiteMap(content, sURL); } else { siteMap = parser.parseSiteMap(contentType, content, sURL); @@ -351,8 +337,8 @@ protected AbstractSiteMap parseSiteMap(String url, byte[] content, Collection subsitemaps = smi.getSitemaps(); int delay = 0; /* - * keep the subsitemaps as outlinks they will be fetched and parsed - * in the following steps + * keep the subsitemaps as outlinks they will be fetched and parsed in the + * following steps */ Iterator iter = subsitemaps.iterator(); while (iter.hasNext()) { @@ -365,13 +351,20 @@ protected AbstractSiteMap parseSiteMap(String url, byte[] content, linksSkippedNotRecentlyModified++; LOG.debug( "{} has a modified date {} which is more than {} hours old", - target, lastModified.toString(), + target, + lastModified.toString(), filterHoursSinceModified); continue; } - Outlink ol = filterOutlink(sURL, target, parentMetadata, - isSitemapKey, "true", isSitemapNewsKey, "false"); + Outlink ol = filterOutlink( + sURL, + target, + parentMetadata, + isSitemapKey, + "true", + isSitemapNewsKey, + "false"); if (ol == null) { continue; } @@ -379,9 +372,7 @@ protected AbstractSiteMap parseSiteMap(String url, byte[] content, // add a delay if (this.scheduleSitemapsWithDelay > 0) { if (delay > 0) { - ol.getMetadata().setValue( - DefaultScheduler.DELAY_METADATA, - Integer.toString(delay)); + ol.getMetadata().setValue(DefaultScheduler.DELAY_METADATA, Integer.toString(delay)); } delay += this.scheduleSitemapsWithDelay; } @@ -389,15 +380,18 @@ protected AbstractSiteMap parseSiteMap(String url, byte[] content, links.add(ol); LOG.debug("{} : [sitemap] {}", url, target); } - LOG.info("Sitemap index (found {} sitemaps, {} skipped): {}", - linksFound, linksSkippedNotRecentlyModified, url); + LOG.info( + "Sitemap index (found {} sitemaps, {} skipped): {}", + linksFound, + linksSkippedNotRecentlyModified, + url); } // sitemap files else { SiteMap sm = (SiteMap) siteMap; Collection sitemapURLs = sm.getSiteMapUrls(); Iterator iter = sitemapURLs.iterator(); - sitemap_urls: while (iter.hasNext()) { + sitemap_urls : while (iter.hasNext()) { linksFound++; SiteMapURL smurl = iter.next(); // TODO handle priority in metadata @@ -414,11 +408,12 @@ protected AbstractSiteMap parseSiteMap(String url, byte[] content, linksSkippedNotRecentlyModified++; LOG.debug( "{} has a modified date {} which is more than {} hours old", - target, lastModified, filterHoursSinceModified); + target, + lastModified, + filterHoursSinceModified); continue; } - ExtensionMetadata[] newsAttrs = smurl - .getAttributesForExtension(Extension.NEWS); + ExtensionMetadata[] newsAttrs = smurl.getAttributesForExtension(Extension.NEWS); if (newsAttrs != null) { // filter based on news publication date // 2008-12-23 @@ -429,7 +424,9 @@ protected AbstractSiteMap parseSiteMap(String url, byte[] content, linksSkippedNotRecentlyModified++; LOG.debug( "{} has a news publication date {} which is more than {} hours old", - target, pubDate, filterHoursSinceModified); + target, + pubDate, + filterHoursSinceModified); continue sitemap_urls; } } @@ -437,8 +434,7 @@ protected AbstractSiteMap parseSiteMap(String url, byte[] content, } // add alternative language links - ExtensionMetadata[] linkAttrs = smurl - .getAttributesForExtension(Extension.LINKS); + ExtensionMetadata[] linkAttrs = smurl.getAttributesForExtension(Extension.LINKS); if (linkAttrs != null) { for (ExtensionMetadata attr : linkAttrs) { LinkAttributes linkAttr = (LinkAttributes) attr; @@ -451,17 +447,28 @@ protected AbstractSiteMap parseSiteMap(String url, byte[] content, // skip href links duplicating sitemap URL continue; } - Outlink ol = filterOutlink(sURL, hrefStr, - parentMetadata, isSitemapKey, "false", - isSitemapNewsKey, "false"); + Outlink ol = filterOutlink( + sURL, + hrefStr, + parentMetadata, + isSitemapKey, + "false", + isSitemapNewsKey, + "false"); if (ol != null) { links.add(ol); } } } - Outlink ol = filterOutlink(sURL, target, parentMetadata, - isSitemapKey, "false", isSitemapNewsKey, "false"); + Outlink ol = filterOutlink( + sURL, + target, + parentMetadata, + isSitemapKey, + "false", + isSitemapNewsKey, + "false"); if (ol == null) { continue; } @@ -469,34 +476,27 @@ protected AbstractSiteMap parseSiteMap(String url, byte[] content, links.add(ol); LOG.debug("{} : [sitemap] {}", url, target); } - LOG.info("Sitemap (found {} links, {} skipped): {}", linksFound, - linksSkippedNotRecentlyModified, url); + LOG.info("Sitemap (found {} links, {} skipped): {}", linksFound, linksSkippedNotRecentlyModified, url); } return siteMap; } @Override - @SuppressWarnings({ "rawtypes", "unchecked" }) - public void prepare(Map stormConf, TopologyContext context, - OutputCollector collector) { + @SuppressWarnings({"rawtypes", "unchecked"}) + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { super.prepare(stormConf, context, collector); - sniffContent = ConfUtils.getBoolean(stormConf, - "sitemap.sniffContent", false); - filterHoursSinceModified = ConfUtils.getInt(stormConf, - "sitemap.filter.hours.since.modified", -1); + sniffContent = ConfUtils.getBoolean(stormConf, "sitemap.sniffContent", false); + filterHoursSinceModified = ConfUtils.getInt(stormConf, "sitemap.filter.hours.since.modified", -1); parseFilters = ParseFilters.fromConf(stormConf); - int maxOffsetGuess = ConfUtils.getInt(stormConf, "sitemap.offset.guess", - 1024); - contentDetector = new ContentDetector( - NewsSiteMapParserBolt.contentClues, maxOffsetGuess); - rssContentDetector = new ContentDetector( - FeedDetectorBolt.contentClues, maxOffsetGuess); + int maxOffsetGuess = ConfUtils.getInt(stormConf, "sitemap.offset.guess", 1024); + contentDetector = new ContentDetector(NewsSiteMapParserBolt.contentClues, maxOffsetGuess); + rssContentDetector = new ContentDetector(FeedDetectorBolt.contentClues, maxOffsetGuess); averagedMetrics = context.registerMetric( "news_sitemap_average_processing_time", - new ReducedMetric(new MeanReducer()), 30); - scheduleSitemapsWithDelay = ConfUtils.getInt(stormConf, - "sitemap.schedule.delay", scheduleSitemapsWithDelay); + new ReducedMetric(new MeanReducer()), + 30); + scheduleSitemapsWithDelay = ConfUtils.getInt(stormConf, "sitemap.schedule.delay", scheduleSitemapsWithDelay); } } diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/PreFilterBolt.java b/src/main/java/org/commoncrawl/stormcrawler/news/PreFilterBolt.java index b986506..d4289c3 100644 --- a/src/main/java/org/commoncrawl/stormcrawler/news/PreFilterBolt.java +++ b/src/main/java/org/commoncrawl/stormcrawler/news/PreFilterBolt.java @@ -25,62 +25,62 @@ */ public class PreFilterBolt extends BaseRichBolt { - protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - - private URLFilters urlFilters; - - protected OutputCollector collector; - - private final String filterConfigFile; - - private static final String _s = org.apache.stormcrawler.Constants.StatusStreamName; - - public PreFilterBolt(String filterConfigFile) { - this.filterConfigFile = filterConfigFile; - } - - @Override - public void execute(Tuple input) { - - // must have at least a URL and metadata - String urlString = input.getStringByField("url"); - Metadata metadata = (Metadata) input.getValueByField("metadata"); - - String filtered = urlFilters.filter(null, null, urlString); - if (StringUtils.isBlank(filtered)) { - LOG.debug("URL rejected: {}", urlString); - // emit with an error to the status stream - metadata.addValue("error.cause", "Filtered"); - Values v = new Values(urlString, metadata, Status.ERROR); - collector.emit(_s, input, v); - collector.ack(input); - return; - } - - // pass to std out - Values v = new Values(urlString, metadata); - collector.emit(input, v); - collector.ack(input); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declareStream(_s, new Fields("url", "metadata", "status")); - declarer.declare(new Fields("url", "metadata")); - } - - @Override - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - this.collector = collector; - if (filterConfigFile != null) { - try { - urlFilters = new URLFilters(stormConf, filterConfigFile); - } catch (IOException e) { - throw new RuntimeException("Can't load filters from " + filterConfigFile); - } - } else { - urlFilters = URLFilters.fromConf(stormConf); - } - } + protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private URLFilters urlFilters; + + protected OutputCollector collector; + + private final String filterConfigFile; + + private static final String _s = org.apache.stormcrawler.Constants.StatusStreamName; + + public PreFilterBolt(String filterConfigFile) { + this.filterConfigFile = filterConfigFile; + } + + @Override + public void execute(Tuple input) { + + // must have at least a URL and metadata + String urlString = input.getStringByField("url"); + Metadata metadata = (Metadata) input.getValueByField("metadata"); + + String filtered = urlFilters.filter(null, null, urlString); + if (StringUtils.isBlank(filtered)) { + LOG.debug("URL rejected: {}", urlString); + // emit with an error to the status stream + metadata.addValue("error.cause", "Filtered"); + Values v = new Values(urlString, metadata, Status.ERROR); + collector.emit(_s, input, v); + collector.ack(input); + return; + } + + // pass to std out + Values v = new Values(urlString, metadata); + collector.emit(input, v); + collector.ack(input); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declareStream(_s, new Fields("url", "metadata", "status")); + declarer.declare(new Fields("url", "metadata")); + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + if (filterConfigFile != null) { + try { + urlFilters = new URLFilters(stormConf, filterConfigFile); + } catch (IOException e) { + throw new RuntimeException("Can't load filters from " + filterConfigFile); + } + } else { + urlFilters = URLFilters.fromConf(stormConf); + } + } } diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/PunycodeURLNormalizer.java b/src/main/java/org/commoncrawl/stormcrawler/news/PunycodeURLNormalizer.java index 4adf03d..b192412 100644 --- a/src/main/java/org/commoncrawl/stormcrawler/news/PunycodeURLNormalizer.java +++ b/src/main/java/org/commoncrawl/stormcrawler/news/PunycodeURLNormalizer.java @@ -39,8 +39,7 @@ private boolean isAscii(String str) { } @Override - public String filter(URL sourceUrl, Metadata sourceMetadata, - String urlToFilter) { + public String filter(URL sourceUrl, Metadata sourceMetadata, String urlToFilter) { try { URL url = new URL(urlToFilter); String hostName = url.getHost(); @@ -51,8 +50,7 @@ public String filter(URL sourceUrl, Metadata sourceMetadata, if (hostName.equals(url.getHost())) { return urlToFilter; } - urlToFilter = new URL(url.getProtocol(), hostName, url.getPort(), - url.getFile()).toString(); + urlToFilter = new URL(url.getProtocol(), hostName, url.getPort(), url.getFile()).toString(); } catch (MalformedURLException e) { return null; } diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/BootstrapTopology.java b/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/BootstrapTopology.java index 821551e..d870bdb 100644 --- a/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/BootstrapTopology.java +++ b/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/BootstrapTopology.java @@ -42,8 +42,7 @@ */ public class BootstrapTopology extends CrawlTopology { - private static final org.slf4j.Logger LOG = LoggerFactory - .getLogger(BootstrapTopology.class); + private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(BootstrapTopology.class); public static void main(String[] args) throws Exception { ConfigurableTopology.start(new BootstrapTopology(), args); @@ -53,12 +52,9 @@ public static void main(String[] args) throws Exception { protected int run(String[] args) { TopologyBuilder builder = new TopologyBuilder(); - LOG.debug("sitemap.sniffContent: {}", - ConfUtils.getBoolean(getConf(), "sitemap.sniffContent", false)); - LOG.info("sitemap.sniffContent: {}", - ConfUtils.getBoolean(getConf(), "sitemap.sniffContent", false)); - LOG.warn("sitemap.sniffContent: {}", - ConfUtils.getBoolean(getConf(), "sitemap.sniffContent", false)); + LOG.debug("sitemap.sniffContent: {}", ConfUtils.getBoolean(getConf(), "sitemap.sniffContent", false)); + LOG.info("sitemap.sniffContent: {}", ConfUtils.getBoolean(getConf(), "sitemap.sniffContent", false)); + LOG.warn("sitemap.sniffContent: {}", ConfUtils.getBoolean(getConf(), "sitemap.sniffContent", false)); int numWorkers = ConfUtils.getInt(getConf(), "topology.workers", 1); @@ -69,34 +65,26 @@ protected int run(String[] args) { if (args.length >= 2) { // arguments include seed directory and file pattern LOG.info("Injecting seeds from {} by pattern {}", args[0], args[1]); - builder.setSpout("filespout", - new FileSpout(args[0], args[1], true)); + builder.setSpout("filespout", new FileSpout(args[0], args[1], true)); Fields key = new Fields("url"); - builder.setBolt("filter", new URLFilterBolt()).fieldsGrouping( - "filespout", Constants.StatusStreamName, key); + builder.setBolt("filter", new URLFilterBolt()).fieldsGrouping("filespout", Constants.StatusStreamName, key); } builder.setSpout("spout", new AggregationSpout(), numShards); - builder.setBolt("partitioner", new URLPartitionerBolt(), numWorkers) - .shuffleGrouping("spout"); + builder.setBolt("partitioner", new URLPartitionerBolt(), numWorkers).shuffleGrouping("spout"); - builder.setBolt("fetch", new FetcherBolt(), numWorkers) - .fieldsGrouping("partitioner", new Fields("key")); + builder.setBolt("fetch", new FetcherBolt(), numWorkers).fieldsGrouping("partitioner", new Fields("key")); - builder.setBolt("sitemap", new NewsSiteMapDetectorBolt(), numWorkers) - .localOrShuffleGrouping("fetch"); + builder.setBolt("sitemap", new NewsSiteMapDetectorBolt(), numWorkers).localOrShuffleGrouping("fetch"); - builder.setBolt("feed", new FeedDetectorBolt(), numWorkers) - .localOrShuffleGrouping("sitemap"); + builder.setBolt("feed", new FeedDetectorBolt(), numWorkers).localOrShuffleGrouping("sitemap"); - builder.setBolt("parse", new JSoupParserBolt()) - .localOrShuffleGrouping("feed"); + builder.setBolt("parse", new JSoupParserBolt()).localOrShuffleGrouping("feed"); // don't need to parse the pages but need to update their status - builder.setBolt("ssb", new DummyIndexer(), numWorkers) - .localOrShuffleGrouping("parse"); + builder.setBolt("ssb", new DummyIndexer(), numWorkers).localOrShuffleGrouping("parse"); WARCHdfsBolt warcbolt = getWarcBolt("CC-NEWS-BOOTSTRAP"); @@ -109,8 +97,7 @@ protected int run(String[] args) { .localOrShuffleGrouping("parse", Constants.StatusStreamName) .localOrShuffleGrouping("ssb", Constants.StatusStreamName) .setNumTasks(numShards) - .customGrouping("filter", Constants.StatusStreamName, - new URLStreamGrouping()); + .customGrouping("filter", Constants.StatusStreamName, new URLStreamGrouping()); return submit(conf, builder); } diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/FeedLinkParseFilter.java b/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/FeedLinkParseFilter.java index 5707189..6ac664d 100644 --- a/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/FeedLinkParseFilter.java +++ b/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/FeedLinkParseFilter.java @@ -27,6 +27,7 @@ * ParseFilter which extracts exclusively RSS links via Xpath, all other links * are skipped. See {@link LinkParseFilter} how to register and configure in * parsefilters.json. A configuration snippet: + * *
  *     {
  *      "class": "org.commoncrawl.stormcrawler.news.bootstrap.FeedLinkParseFilter",
@@ -41,12 +42,10 @@
  */
 public class FeedLinkParseFilter extends LinkParseFilter {
 
-    private static final org.slf4j.Logger LOG = LoggerFactory
-            .getLogger(FeedLinkParseFilter.class);
+    private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(FeedLinkParseFilter.class);
 
     @Override
-    public void filter(String URL, byte[] content, DocumentFragment doc,
-            ParseResult parse) {
+    public void filter(String URL, byte[] content, DocumentFragment doc, ParseResult parse) {
 
         // skip existing links
         logLinks(parse, URL, "Skipped links");
diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/NewsSiteMapDetectorBolt.java b/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/NewsSiteMapDetectorBolt.java
index 1160201..2e1011b 100644
--- a/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/NewsSiteMapDetectorBolt.java
+++ b/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/NewsSiteMapDetectorBolt.java
@@ -40,16 +40,14 @@
 @SuppressWarnings("serial")
 public class NewsSiteMapDetectorBolt extends SiteMapParserBolt {
 
-    private static final org.slf4j.Logger LOG = LoggerFactory
-            .getLogger(NewsSiteMapDetectorBolt.class);
+    private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(NewsSiteMapDetectorBolt.class);
 
     protected static final int maxOffsetContentGuess = 1024;
-    private static ContentDetector contentDetector = new ContentDetector(
-            NewsSiteMapParserBolt.contentClues, maxOffsetContentGuess);
+    private static ContentDetector contentDetector = new ContentDetector(NewsSiteMapParserBolt.contentClues,
+            maxOffsetContentGuess);
 
     private ParseFilter parseFilters;
 
-
     @Override
     public void execute(Tuple tuple) {
         Metadata metadata = (Metadata) tuple.getValueByField("metadata");
@@ -57,10 +55,8 @@ public void execute(Tuple tuple) {
         byte[] content = tuple.getBinaryByField("content");
         String url = tuple.getStringByField("url");
 
-        boolean isSitemap = Boolean.valueOf(
-                metadata.getFirstValue(SiteMapParserBolt.isSitemapKey));
-        boolean isNewsSitemap = Boolean.valueOf(
-                metadata.getFirstValue(NewsSiteMapParserBolt.isSitemapNewsKey));
+        boolean isSitemap = Boolean.valueOf(metadata.getFirstValue(SiteMapParserBolt.isSitemapKey));
+        boolean isNewsSitemap = Boolean.valueOf(metadata.getFirstValue(NewsSiteMapParserBolt.isSitemapNewsKey));
 
         if (!isNewsSitemap || !isSitemap) {
             int match = contentDetector.getFirstMatch(content);
@@ -70,10 +66,8 @@ public void execute(Tuple tuple) {
                 metadata.setValue(SiteMapParserBolt.isSitemapKey, "true");
                 if (match <= NewsSiteMapParserBolt.contentCluesSitemapNewsMatchUpTo) {
                     isNewsSitemap = true;
-                    LOG.info("{} detected as news sitemap based on content",
-                            url);
-                    metadata.setValue(NewsSiteMapParserBolt.isSitemapNewsKey,
-                            "true");
+                    LOG.info("{} detected as news sitemap based on content", url);
+                    metadata.setValue(NewsSiteMapParserBolt.isSitemapNewsKey, "true");
                 }
             }
         }
@@ -85,8 +79,7 @@ public void execute(Tuple tuple) {
             parseData.setMetadata(metadata);
             parseFilters.filter(url, content, null, parse);
             // emit status
-            collector.emit(Constants.StatusStreamName, tuple,
-                    new Values(url, metadata, Status.FETCHED));
+            collector.emit(Constants.StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED));
         } else {
             // pass on
             collector.emit(tuple, tuple.getValues());
@@ -95,9 +88,8 @@ public void execute(Tuple tuple) {
     }
 
     @Override
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    public void prepare(Map stormConf, TopologyContext context,
-            OutputCollector collect) {
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collect) {
         super.prepare(stormConf, context, collect);
         parseFilters = ParseFilters.fromConf(stormConf);
     }
diff --git a/src/test/java/org/commoncrawl/stormcrawler/FastURLFilterTest.java b/src/test/java/org/commoncrawl/stormcrawler/FastURLFilterTest.java
index 64df0ba..aa05f36 100644
--- a/src/test/java/org/commoncrawl/stormcrawler/FastURLFilterTest.java
+++ b/src/test/java/org/commoncrawl/stormcrawler/FastURLFilterTest.java
@@ -35,53 +35,53 @@ public class FastURLFilterTest {
 
     @BeforeClass
     public static void init() {
-	filter = createFilter("fast-urlfilter.txt");
+        filter = createFilter("fast-urlfilter.txt");
     }
 
     public static FastURLFilter createFilter(String fileName) {
-	ObjectNode filterParams = new ObjectNode(JsonNodeFactory.instance);
-	filterParams.put("file", fileName);
-	FastURLFilter filter = new FastURLFilter();
-	Map conf = new HashMap<>();
-	conf.put("fast.urlfilter.refresh", 10);
-	filter.configure(conf, filterParams);
-	return filter;
+        ObjectNode filterParams = new ObjectNode(JsonNodeFactory.instance);
+        filterParams.put("file", fileName);
+        FastURLFilter filter = new FastURLFilter();
+        Map conf = new HashMap<>();
+        conf.put("fast.urlfilter.refresh", 10);
+        filter.configure(conf, filterParams);
+        return filter;
     }
 
     @Test
     public void testHostFilter() throws MalformedURLException {
-	URL url = new URL("http://may.go.com/image.jpg");
-	Metadata metadata = new Metadata();
-	String filterResult = filter.filter(url, metadata, url.toExternalForm());
-	Assert.assertEquals(url.toString(), filterResult);
-	
-	url = new URL("http://no.go.com/");
-	filterResult = filter.filter(url, metadata, url.toExternalForm());
-	Assert.assertEquals(null, filterResult);
+        URL url = new URL("http://may.go.com/image.jpg");
+        Metadata metadata = new Metadata();
+        String filterResult = filter.filter(url, metadata, url.toExternalForm());
+        Assert.assertEquals(url.toString(), filterResult);
+
+        url = new URL("http://no.go.com/");
+        filterResult = filter.filter(url, metadata, url.toExternalForm());
+        Assert.assertEquals(null, filterResult);
     }
 
     @Test
     public void testDomainNotAllowed() throws MalformedURLException {
-	URL url = new URL("http://domainnotallowed.com/forum/search.php");
-	Metadata metadata = new Metadata();
-	String filterResult = filter.filter(url, metadata, url.toExternalForm());
-	Assert.assertEquals(null, filterResult);
-	
-	url = new URL("http://domainnotallowed.com/");
-	filterResult = filter.filter(url, metadata, url.toExternalForm());
-	Assert.assertEquals(null, filterResult);
-	
-	url = new URL("http://partiallyallowed.com/");
-	filterResult = filter.filter(url, metadata, url.toExternalForm());
-	Assert.assertEquals(url.toString(), filterResult);
-	
-	url = new URL("http://partiallyallowed.com/verbotten");
-	filterResult = filter.filter(url, metadata, url.toExternalForm());
-	Assert.assertEquals(null, filterResult);
+        URL url = new URL("http://domainnotallowed.com/forum/search.php");
+        Metadata metadata = new Metadata();
+        String filterResult = filter.filter(url, metadata, url.toExternalForm());
+        Assert.assertEquals(null, filterResult);
+
+        url = new URL("http://domainnotallowed.com/");
+        filterResult = filter.filter(url, metadata, url.toExternalForm());
+        Assert.assertEquals(null, filterResult);
+
+        url = new URL("http://partiallyallowed.com/");
+        filterResult = filter.filter(url, metadata, url.toExternalForm());
+        Assert.assertEquals(url.toString(), filterResult);
+
+        url = new URL("http://partiallyallowed.com/verbotten");
+        filterResult = filter.filter(url, metadata, url.toExternalForm());
+        Assert.assertEquals(null, filterResult);
 
-	// allowed
-	url = new URL("http://digitalpebble.com/");
-	filterResult = filter.filter(url, metadata, url.toExternalForm());
-	Assert.assertEquals(url.toString(), filterResult);
+        // allowed
+        url = new URL("http://digitalpebble.com/");
+        filterResult = filter.filter(url, metadata, url.toExternalForm());
+        Assert.assertEquals(url.toString(), filterResult);
     }
 }
diff --git a/src/test/java/org/commoncrawl/stormcrawler/news/NewsSiteMapParserTest.java b/src/test/java/org/commoncrawl/stormcrawler/news/NewsSiteMapParserTest.java
index db73d67..db0fedd 100644
--- a/src/test/java/org/commoncrawl/stormcrawler/news/NewsSiteMapParserTest.java
+++ b/src/test/java/org/commoncrawl/stormcrawler/news/NewsSiteMapParserTest.java
@@ -41,59 +41,62 @@ public class NewsSiteMapParserTest extends ParsingTester {
 
     @Before
     public void setupParserBolt() {
-	setupParserBolt(new NewsSiteMapParserBolt());
-	Map config = new HashMap<>();
-	config.put("sitemap.sniffContent", true);
-	// allow items published during the last week
-	config.put("sitemap.filter.hours.since.modified", 168);
-	prepareParserBolt("test.parsefilters.json", config);
+        setupParserBolt(new NewsSiteMapParserBolt());
+        Map config = new HashMap<>();
+        config.put("sitemap.sniffContent", true);
+        // allow items published during the last week
+        config.put("sitemap.filter.hours.since.modified", 168);
+        prepareParserBolt("test.parsefilters.json", config);
     }
 
     @Test
     public void testSiteMapParser() throws IOException, UnknownFormatException {
-	String url = "https://example.org/sitemap-news.xml";
-	byte[] content = readContent("sitemap-news.xml");
-	String contentType = "";
-	Metadata parentMetadata = new Metadata();
-	List links = new ArrayList<>();
-
-	SitemapType type = ((NewsSiteMapParserBolt) bolt).detectContent(url, content);
-	assertEquals(SitemapType.NEWS, type);
-
-	((NewsSiteMapParserBolt) bolt).parseSiteMap(url, content, contentType, parentMetadata, links);
-
-	// unmodified sitemap:
-	// - publication date is far in the past, link should be skipped
-	// 2008-12-23
-	assertEquals("Outdated link not skipped", 0, links.size());
-
-	// now set the publication date to yesterday
-	LocalDateTime yesterday = LocalDateTime.now().minusDays(1);
-	content = (new String(content, StandardCharsets.UTF_8))
-		.replace("2008-12-23", ""
-			+ yesterday.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")) + "")
-		.getBytes(StandardCharsets.UTF_8);
-	((NewsSiteMapParserBolt) bolt).parseSiteMap(url, content, contentType, parentMetadata, links);
-
-	assertEquals("Expected one  and one additional  link - image links are ignored", 2,
-		links.size());
+        String url = "https://example.org/sitemap-news.xml";
+        byte[] content = readContent("sitemap-news.xml");
+        String contentType = "";
+        Metadata parentMetadata = new Metadata();
+        List links = new ArrayList<>();
+
+        SitemapType type = ((NewsSiteMapParserBolt) bolt).detectContent(url, content);
+        assertEquals(SitemapType.NEWS, type);
+
+        ((NewsSiteMapParserBolt) bolt).parseSiteMap(url, content, contentType, parentMetadata, links);
+
+        // unmodified sitemap:
+        // - publication date is far in the past, link should be skipped
+        // 2008-12-23
+        assertEquals("Outdated link not skipped", 0, links.size());
+
+        // now set the publication date to yesterday
+        LocalDateTime yesterday = LocalDateTime.now().minusDays(1);
+        content = (new String(content, StandardCharsets.UTF_8))
+                .replace(
+                        "2008-12-23",
+                        ""
+                                + yesterday.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))
+                                + "")
+                .getBytes(StandardCharsets.UTF_8);
+        ((NewsSiteMapParserBolt) bolt).parseSiteMap(url, content, contentType, parentMetadata, links);
+
+        assertEquals(
+                "Expected one  and one additional  link - image links are ignored",
+                2,
+                links.size());
     }
 
     protected byte[] readContent(String filename) throws IOException {
-	ByteArrayOutputStream baos = new ByteArrayOutputStream();
-	IOUtils.copy(getClass().getClassLoader().getResourceAsStream(filename), baos);
-	return baos.toByteArray();
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        IOUtils.copy(getClass().getClassLoader().getResourceAsStream(filename), baos);
+        return baos.toByteArray();
     }
 
-	@Test
-     public void testFeedWithSitemapNamespace() throws IOException, UnknownFormatException {
-         String url = "https://example.org/feed.xml";
-		byte[] content = readContent("feed-with-sitemap-namespace.xml");
-         SitemapType type = ((NewsSiteMapParserBolt) bolt).detectContent(url, content);
-         assertNotEquals("RSS feed with sitemap namespace should not be detected as sitemap",
-                 SitemapType.NEWS, type);
-         assertNotEquals("RSS feed with sitemap namespace should not be detected as sitemap",
-                 SitemapType.SITEMAP, type);
-     }
+    @Test
+    public void testFeedWithSitemapNamespace() throws IOException, UnknownFormatException {
+        String url = "https://example.org/feed.xml";
+        byte[] content = readContent("feed-with-sitemap-namespace.xml");
+        SitemapType type = ((NewsSiteMapParserBolt) bolt).detectContent(url, content);
+        assertNotEquals("RSS feed with sitemap namespace should not be detected as sitemap", SitemapType.NEWS, type);
+        assertNotEquals("RSS feed with sitemap namespace should not be detected as sitemap", SitemapType.SITEMAP, type);
+    }
 
 }

From cf56e2b692ef2caacb2097e9e896a4a8e5c431b7 Mon Sep 17 00:00:00 2001
From: Luca Foppiano 
Date: Tue, 21 Apr 2026 14:50:11 +0100
Subject: [PATCH 05/10] chore: make spotless running on the CI

---
 .github/workflows/maven.yml | 2 ++
 pom.xml                     | 7 +++++++
 2 files changed, 9 insertions(+)

diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml
index dc52ef3..50f335c 100644
--- a/.github/workflows/maven.yml
+++ b/.github/workflows/maven.yml
@@ -63,5 +63,7 @@ jobs:
       with:
         distribution: adopt
         java-version: ${{ matrix.java }}
+    - name: Check code formatting
+      run: mvn -B --no-transfer-progress spotless:check
     - name: Build with Maven
       run: mvn -B --no-transfer-progress package --file pom.xml -DCI_ENV=true verify
diff --git a/pom.xml b/pom.xml
index 441c23c..9cd1aca 100644
--- a/pom.xml
+++ b/pom.xml
@@ -228,6 +228,13 @@ under the License.
 						
 					
 				
+				
+					
+						
+							check
+						
+					
+				
 			
 		
 	

From 83e2abc07c79b8b17118d7a86bc1d2a01b8c666d Mon Sep 17 00:00:00 2001
From: Luca Foppiano 
Date: Mon, 15 Jun 2026 16:26:11 +0100
Subject: [PATCH 06/10] feat: replace spotless with
 git-code-format-maven-plugin for code formatting

Signed-off-by: Luca Foppiano 
---
 .github/workflows/maven.yml |   2 +-
 .mvn/jvm.config             |   8 ++
 eclipse-formatter.xml       | 176 ------------------------------------
 pom.xml                     |  72 ++++++++-------
 4 files changed, 50 insertions(+), 208 deletions(-)
 create mode 100644 .mvn/jvm.config
 delete mode 100644 eclipse-formatter.xml

diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml
index 50f335c..5c79a4c 100644
--- a/.github/workflows/maven.yml
+++ b/.github/workflows/maven.yml
@@ -64,6 +64,6 @@ jobs:
         distribution: adopt
         java-version: ${{ matrix.java }}
     - name: Check code formatting
-      run: mvn -B --no-transfer-progress spotless:check
+      run: mvn -B --no-transfer-progress com.cosium.code:git-code-format-maven-plugin:validate-code-format -Dskip.format.code=false
     - name: Build with Maven
       run: mvn -B --no-transfer-progress package --file pom.xml -DCI_ENV=true verify
diff --git a/.mvn/jvm.config b/.mvn/jvm.config
new file mode 100644
index 0000000..87ae20c
--- /dev/null
+++ b/.mvn/jvm.config
@@ -0,0 +1,8 @@
+--add-exports jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED
+--add-exports jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED
+--add-exports jdk.compiler/com.sun.tools.javac.main=ALL-UNNAMED
+--add-exports jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED
+--add-exports jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED
+--add-exports jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED
+--add-opens jdk.compiler/com.sun.tools.javac.code=ALL-UNNAMED
+--add-opens jdk.compiler/com.sun.tools.javac.comp=ALL-UNNAMED
diff --git a/eclipse-formatter.xml b/eclipse-formatter.xml
deleted file mode 100644
index e9782b8..0000000
--- a/eclipse-formatter.xml
+++ /dev/null
@@ -1,176 +0,0 @@
-
-
-	
-		  
-        
-        
-        
-        
-
-        
-        
-        
-
-        
-        
-        
-        
-
-        
-        
-        
-        
-
-        
-        
-        
-        
-        
-        
-        
-        
-        
-        
-        
-
-
-        
-        
-        
-
-        
-        
-        
-        
-        
-        
-        
-        
-        
-        
-        
-        
-
-        
-        
-        
-        
-
-        
-        
-        
-        
-        
-        
-        
-        
-        
-        
-        
-
-        
-        
-        
-        
-        
-        
-        
-        
-        
-        
-        
-        
-        
-        
-        
-        
-        
-        
-        
-        
-
-        
-        
-        
-        
-        
-        
-        
-        
-        
-        
-
-        
-        
-        
-        
-        
-
-        
-        
-        
-
-        
-        
-        
-        
-
-        
-        
-        
-        
-        
-        
-        
-        
-        
-        
-        
-        
-        
-
-        
-        
-        
-        
-        
-        
-        
-        
-        
-        
-        
-
-        
-        
-        
-        
-
-        
-        
-        
-        
-
-        
-        
-        
-        
-        
-	
-
diff --git a/pom.xml b/pom.xml
index 9cd1aca..69b7487 100644
--- a/pom.xml
+++ b/pom.xml
@@ -46,6 +46,8 @@ under the License.
 		5.23.0
 		3.0.1
 		4.13.2
+		5.4
+		true
 	
 
 	
@@ -205,37 +207,45 @@ under the License.
 				
 			
 			
-				com.diffplug.spotless
-				spotless-maven-plugin
-				2.46.1
-				
-					
-						
-						
-							pom.xml
-						
-						
-							all
-							true
-							false
-							-1
-							recommended_2008_06
-						
-					
-					
-						
-							${project.basedir}/eclipse-formatter.xml
-						
-					
-				
-				
-					
-						
-							check
-						
-					
-				
-			
+                com.cosium.code
+                git-code-format-maven-plugin
+                ${git-code-format-maven-plugin.version}
+                
+                    
+                    
+                        install-formatter-hook
+                        
+                            install-hooks
+                        
+                    
+                    
+                    
+                        validate-code-format
+                        
+                            validate-code-format
+                        
+                    
+                
+                
+                    
+                    
+                        com.cosium.code
+                        google-java-format
+                        ${git-code-format-maven-plugin.version}
+                    
+                
+                
+                    ${skip.format.code}
+                    
+                        true
+                        false
+                        false
+                        false
+                    
+                
+            
 		
 	
 

From 05e190297e2ec25f26c862c4b24784e36363e69b Mon Sep 17 00:00:00 2001
From: Luca Foppiano 
Date: Mon, 15 Jun 2026 16:26:53 +0100
Subject: [PATCH 07/10] chore: reformat with Cosium

Signed-off-by: Luca Foppiano 
---
 .../stormcrawler/filter/FastURLFilter.java    | 179 +++++++++---------
 .../stormcrawler/news/ContentDetector.java    |  26 ++-
 .../stormcrawler/news/CrawlTopology.java      |  70 +++----
 .../stormcrawler/news/FeedDetectorBolt.java   |  13 +-
 .../news/NewsSiteMapParserBolt.java           | 139 ++++++++------
 .../stormcrawler/news/PreFilterBolt.java      |  10 +-
 .../news/PunycodeURLNormalizer.java           |  10 +-
 .../news/bootstrap/BootstrapTopology.java     |  61 +++---
 .../news/bootstrap/FeedLinkParseFilter.java   |  21 +-
 .../bootstrap/NewsSiteMapDetectorBolt.java    |  25 ++-
 .../stormcrawler/FastURLFilterTest.java       |  10 +-
 .../news/NewsSiteMapParserTest.java           |  44 +++--
 12 files changed, 315 insertions(+), 293 deletions(-)

diff --git a/src/main/java/org/commoncrawl/stormcrawler/filter/FastURLFilter.java b/src/main/java/org/commoncrawl/stormcrawler/filter/FastURLFilter.java
index e0b8a7b..0b2b04d 100644
--- a/src/main/java/org/commoncrawl/stormcrawler/filter/FastURLFilter.java
+++ b/src/main/java/org/commoncrawl/stormcrawler/filter/FastURLFilter.java
@@ -1,21 +1,29 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * 

http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and * limitations under the License. */ package org.commoncrawl.stormcrawler.filter; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.S3Object; +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.LinkedHashMultimap; +import com.google.common.collect.Multimap; import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.IOException; @@ -29,7 +37,6 @@ import java.util.TimerTask; import java.util.regex.Pattern; import java.util.zip.GZIPInputStream; - import org.apache.commons.lang3.StringUtils; import org.apache.stormcrawler.JSONResource; import org.apache.stormcrawler.Metadata; @@ -38,28 +45,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; -import com.amazonaws.services.s3.model.GetObjectRequest; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.S3Object; -import com.fasterxml.jackson.core.JsonParseException; -import com.fasterxml.jackson.databind.JsonMappingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.collect.LinkedHashMultimap; -import com.google.common.collect.Multimap; - /** - * Version of the FastURLFilter that can load from a text representation instead - * of the JSON that the SC version handles. Can also reload periodically and get - * its content from S3. - * - * Filters URLs based on a file of regular expressions using host/domains - * matching first. The default policy is to accept a URL if no matches are - * found. + * Version of the FastURLFilter that can load from a text representation instead of the JSON that + * the SC version handles. Can also reload periodically and get its content from S3. + * + *

Filters URLs based on a file of regular expressions using host/domains matching first. The + * default policy is to accept a URL if no matches are found. + * + *

Rule Format: * - * Rule Format: - * *

  * Host www.example.org
  *   DenyPath /path/to/be/excluded
@@ -72,48 +66,45 @@
  * Domain example.org
  *   DenyPathQuery /resource/.*?action=exclude
  * 
- * - * Host rules are evaluated before Domain rules. For - * Host rules the entire host name of a URL must match while the - * domain names in Domain rules are considered as matches if the - * domain is a suffix of the host name (consisting of complete host name parts). - * Shorter domain suffixes are checked first, a single dot - * "." as "domain name" can be used to specify - * global rules applied to every URL. - * - * E.g., for "www.example.com" the rules given above are looked up in the - * following order: + * + * Host rules are evaluated before Domain rules. For Host + * rules the entire host name of a URL must match while the domain names in Domain + * rules are considered as matches if the domain is a suffix of the host name (consisting of + * complete host name parts). Shorter domain suffixes are checked first, a single dot ". + * " as "domain name" can be used to specify global rules applied to every + * URL. + * + *

E.g., for "www.example.com" the rules given above are looked up in the following order: + * *

    - *
  1. check "www.example.com" whether host-based rules exist and whether one of - * them matches
  2. - *
  3. check "www.example.com" for domain-based rules
  4. - *
  5. check "example.com" for domain-based rules
  6. - *
  7. check "com" for domain-based rules
  8. - *
  9. check for global rules ("Domain .")
  10. + *
  11. check "www.example.com" whether host-based rules exist and whether one of them matches + *
  12. check "www.example.com" for domain-based rules + *
  13. check "example.com" for domain-based rules + *
  14. check "com" for domain-based rules + *
  15. check for global rules ("Domain .") *
- * The first matching rule will reject the URL and no further rules are checked. - * If no rule matches the URL is accepted. URLs without a host name (e.g., - * file:/path/file.txt are checked for global rules only. URLs - * which fail to be parsed as {@link java.net.URL} are always rejected. - * - * For rules either the URL path (DenyPath) or path and query - * (DenyPathQuery) are checked whether the given - * {@link java.util.regex Java Regular expression} is found (see - * {@link java.util.regex.Matcher#find()}) in the URL path (and query). - * - * Rules are applied in the order of their definition. For better performance, - * regular expressions which are simpler/faster or match more URLs should be - * defined earlier. - * - * Comments in the rule file start with the # character and reach - * until the end of the line. - * - * The rules file is defined via the property urlfilter.fast.file, - * the default name is fast-urlfilter.txt. + * + * The first matching rule will reject the URL and no further rules are checked. If no rule matches + * the URL is accepted. URLs without a host name (e.g., file:/path/file.txt are checked + * for global rules only. URLs which fail to be parsed as {@link java.net.URL} are always rejected. + * + *

For rules either the URL path (DenyPath) or path and query (DenyPathQuery + * ) are checked whether the given {@link java.util.regex Java Regular expression} is found + * (see {@link java.util.regex.Matcher#find()}) in the URL path (and query). + * + *

Rules are applied in the order of their definition. For better performance, regular + * expressions which are simpler/faster or match more URLs should be defined earlier. + * + *

Comments in the rule file start with the # character and reach until the end of + * the line. + * + *

The rules file is defined via the property urlfilter.fast.file, the default name + * is fast-urlfilter.txt. */ public class FastURLFilter extends URLFilter implements JSONResource { - protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + protected static final Logger LOG = + LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); public static final String URLFILTER_FAST_FILE = "urlfilter.fast.file"; private Multimap hostRules = LinkedHashMultimap.create(); @@ -121,7 +112,8 @@ public class FastURLFilter extends URLFilter implements JSONResource { private String resourceFile; - private static final Pattern CATCH_ALL_RULE = Pattern.compile("^\\s*DenyPath(?:Query)?\\s+\\.[*?]\\s*$"); + private static final Pattern CATCH_ALL_RULE = + Pattern.compile("^\\s*DenyPath(?:Query)?\\s+\\.[*?]\\s*$"); private String resourceETAG; @@ -151,24 +143,28 @@ public void configure(@SuppressWarnings("rawtypes") Map stormConf, JsonNode filt if (refreshRate != -1) { LOG.info("Filter set to reload from {} every {} sec", getResourceFile(), refreshRate); - new Timer().schedule(new TimerTask() { - public void run() { - LOG.info("Reloading resources"); - try { - loadJSONResources(); - } catch (Exception e) { - LOG.error("Can't load resources", e); - } - } - }, refreshRate * 1000, refreshRate * 1000); + new Timer() + .schedule( + new TimerTask() { + public void run() { + LOG.info("Reloading resources"); + try { + loadJSONResources(); + } catch (Exception e) { + LOG.error("Can't load resources", e); + } + } + }, + refreshRate * 1000, + refreshRate * 1000); } } /** * Load the resources from the JSON file in the uber jar or from S3 - * + * * @throws Exception - **/ + */ @Override public void loadJSONResources() throws Exception { InputStream inputStream = null; @@ -248,7 +244,10 @@ public String filter(URL sourceUrl, Metadata sourceMetadata, String urlToFilter) try { u = new URL(urlToFilter); } catch (Exception e) { - LOG.debug("Rejected {} because failed to parse as URL: {}", urlToFilter, e.getMessage()); + LOG.debug( + "Rejected {} because failed to parse as URL: {}", + urlToFilter, + e.getMessage()); return null; } @@ -344,7 +343,11 @@ private void reloadRules(Reader rules) throws IOException { continue; } } catch (Exception e) { - LOG.warn("Problem reading rule on line {}: {} - {}", lineno, line, e.getMessage()); + LOG.warn( + "Problem reading rule on line {}: {} - {}", + lineno, + line, + e.getMessage()); continue; } @@ -359,7 +362,10 @@ private void reloadRules(Reader rules) throws IOException { } } catch (IOException e) { - LOG.warn("Caught exception while reading rules file at line {}: {}", lineno, e.getMessage()); + LOG.warn( + "Caught exception while reading rules file at line {}: {}", + lineno, + e.getMessage()); throw e; } } @@ -368,8 +374,7 @@ private void reloadRules(Reader rules) throws IOException { public static class Rule { protected Pattern pattern; - Rule() { - } + Rule() {} public Rule(String regex) { pattern = Pattern.compile(regex); diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/ContentDetector.java b/src/main/java/org/commoncrawl/stormcrawler/news/ContentDetector.java index 7305195..9979781 100644 --- a/src/main/java/org/commoncrawl/stormcrawler/news/ContentDetector.java +++ b/src/main/java/org/commoncrawl/stormcrawler/news/ContentDetector.java @@ -13,27 +13,24 @@ */ package org.commoncrawl.stormcrawler.news; +import com.google.common.primitives.Bytes; import java.nio.charset.StandardCharsets; import java.util.Arrays; -import com.google.common.primitives.Bytes; - public class ContentDetector { protected byte[][][] clues; protected int maxOffset; /** - * Set up detector to detect content sniffing for a set of clue strings in a - * prefix of the binary content. + * Set up detector to detect content sniffing for a set of clue strings in a prefix of the + * binary content. * - * @param clues nested list of literal clues. Outer list defines an - * OR-group, inner list contained ANDed clues required to match - * all, e.g. the following definition would match if either - * "clue1" and "and_clue2" are matched, or - * alternatively "or_clue3" is found - * - *

+     * @param clues nested list of literal clues. Outer list defines an OR-group, inner list
+     *     contained ANDed clues required to match all, e.g. the following definition would match if
+     *     either "clue1" and "and_clue2" are matched, or alternatively
+     *     "or_clue3" is found
+     *     
      *                  { { clue1, and_clue2 }, { or_clue3 } }
      *                  
* @@ -54,11 +51,11 @@ public int getFirstMatch(byte[] content) { if (content.length > maxOffset) { beginning = Arrays.copyOfRange(content, 0, maxOffset); } - OR : for (int i = 0; i < clues.length; i++) { + OR: + for (int i = 0; i < clues.length; i++) { byte[][] group = clues[i]; for (byte[] clue : group) { - if (Bytes.indexOf(beginning, clue) == -1) - continue OR; + if (Bytes.indexOf(beginning, clue) == -1) continue OR; } // success, all members of one group matched return i; @@ -69,5 +66,4 @@ public int getFirstMatch(byte[] content) { public boolean matches(byte[] content) { return (getFirstMatch(content) >= 0); } - } diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java b/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java index 3c5d247..9368d2c 100644 --- a/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java +++ b/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java @@ -1,26 +1,22 @@ /** - * Licensed to DigitalPebble Ltd under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * DigitalPebble licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * Licensed to DigitalPebble Ltd under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. + * DigitalPebble licenses this file to You under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. You may obtain a copy of the + * License at * - * http://www.apache.org/licenses/LICENSE-2.0 + *

http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and * limitations under the License. */ - package org.commoncrawl.stormcrawler.news; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; - import org.apache.storm.topology.BoltDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; @@ -43,9 +39,7 @@ import org.apache.stormcrawler.warc.WARCHdfsBolt; import org.slf4j.LoggerFactory; -/** - * Dummy topology to play with the spouts and bolts on OpenSearch - */ +/** Dummy topology to play with the spouts and bolts on OpenSearch */ public class CrawlTopology extends ConfigurableTopology { private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(CrawlTopology.class); @@ -70,22 +64,28 @@ protected int run(String[] args) { builder.setSpout("filespout", new FileSpout(args[0], args[1], true)); Fields key = new Fields("url"); - builder.setBolt("filter", new URLFilterBolt()).fieldsGrouping("filespout", Constants.StatusStreamName, key); + builder.setBolt("filter", new URLFilterBolt()) + .fieldsGrouping("filespout", Constants.StatusStreamName, key); } builder.setSpout("spout", new AggregationSpout(), numShards); - builder.setBolt("prefilter", new PreFilterBolt("pre-urlfilters.json"), numWorkers).shuffleGrouping("spout"); + builder.setBolt("prefilter", new PreFilterBolt("pre-urlfilters.json"), numWorkers) + .shuffleGrouping("spout"); - builder.setBolt("partitioner", new URLPartitionerBolt(), numWorkers).shuffleGrouping("prefilter"); + builder.setBolt("partitioner", new URLPartitionerBolt(), numWorkers) + .shuffleGrouping("prefilter"); - builder.setBolt("fetch", new FetcherBolt(), numWorkers).fieldsGrouping("partitioner", new Fields("key")); + builder.setBolt("fetch", new FetcherBolt(), numWorkers) + .fieldsGrouping("partitioner", new Fields("key")); builder.setBolt("sitemap", new NewsSiteMapParserBolt(), numWorkers) .setNumTasks(2) .localOrShuffleGrouping("fetch"); - builder.setBolt("feed", new FeedParserBolt(), numWorkers).setNumTasks(4).localOrShuffleGrouping("sitemap"); + builder.setBolt("feed", new FeedParserBolt(), numWorkers) + .setNumTasks(4) + .localOrShuffleGrouping("sitemap"); // don't need to parse the pages but need to update their status builder.setBolt("ssb", new DummyIndexer(), numWorkers).localOrShuffleGrouping("feed"); @@ -98,15 +98,17 @@ protected int run(String[] args) { final Fields furl = new Fields("url"); - BoltDeclarer statusBolt = builder.setBolt("status", new StatusUpdaterBolt(), numWorkers) - .fieldsGrouping("fetch", Constants.StatusStreamName, furl) - .fieldsGrouping("sitemap", Constants.StatusStreamName, furl) - .fieldsGrouping("feed", Constants.StatusStreamName, furl) - .fieldsGrouping("ssb", Constants.StatusStreamName, furl) - .fieldsGrouping("prefilter", Constants.StatusStreamName, furl); + BoltDeclarer statusBolt = + builder.setBolt("status", new StatusUpdaterBolt(), numWorkers) + .fieldsGrouping("fetch", Constants.StatusStreamName, furl) + .fieldsGrouping("sitemap", Constants.StatusStreamName, furl) + .fieldsGrouping("feed", Constants.StatusStreamName, furl) + .fieldsGrouping("ssb", Constants.StatusStreamName, furl) + .fieldsGrouping("prefilter", Constants.StatusStreamName, furl); if (args.length >= 2) { - statusBolt.customGrouping("filter", Constants.StatusStreamName, new URLStreamGrouping()); + statusBolt.customGrouping( + "filter", Constants.StatusStreamName, new URLStreamGrouping()); } statusBolt.setNumTasks(numShards); @@ -127,12 +129,15 @@ protected WARCHdfsBolt getWarcBolt(String filePrefix) { String userAgent = AbstractHttpProtocol.getAgentString(getConf()); fields.put("http-header-user-agent", userAgent); fields.put("http-header-from", ConfUtils.getString(getConf(), "http.agent.email")); - String robotsTxtParser = "checked by crawler-commons " - + crawlercommons.CrawlerCommons.getVersion() - + " (https://github.com/crawler-commons/crawler-commons)"; + String robotsTxtParser = + "checked by crawler-commons " + + crawlercommons.CrawlerCommons.getVersion() + + " (https://github.com/crawler-commons/crawler-commons)"; fields.put("robots", robotsTxtParser); fields.put("format", "WARC File Format 1.1"); - fields.put("conformsTo", "https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.1/"); + fields.put( + "conformsTo", + "https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.1/"); WARCHdfsBolt warcbolt = (WARCHdfsBolt) new WARCHdfsBolt(); warcbolt.withConfigKey("warc"); @@ -156,5 +161,4 @@ protected WARCHdfsBolt getWarcBolt(String filePrefix) { return warcbolt; } - } diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/FeedDetectorBolt.java b/src/main/java/org/commoncrawl/stormcrawler/news/FeedDetectorBolt.java index 6fed661..e607dc2 100644 --- a/src/main/java/org/commoncrawl/stormcrawler/news/FeedDetectorBolt.java +++ b/src/main/java/org/commoncrawl/stormcrawler/news/FeedDetectorBolt.java @@ -14,13 +14,11 @@ package org.commoncrawl.stormcrawler.news; import java.util.Map; - +import org.apache.http.HttpHeaders; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; -import org.slf4j.LoggerFactory; - import org.apache.stormcrawler.Constants; import org.apache.stormcrawler.Metadata; import org.apache.stormcrawler.bolt.FeedParserBolt; @@ -29,7 +27,7 @@ import org.apache.stormcrawler.parse.ParseFilters; import org.apache.stormcrawler.parse.ParseResult; import org.apache.stormcrawler.persistence.Status; -import org.apache.http.HttpHeaders; +import org.slf4j.LoggerFactory; /** Detect RSS and Atom feeds, but do not parse and extract links */ @SuppressWarnings("serial") @@ -41,7 +39,8 @@ public class FeedDetectorBolt extends FeedParserBolt { public static String[][] contentClues = {{"news + * ParserBolt for news * sitemaps. */ @SuppressWarnings("serial") @@ -76,24 +73,29 @@ public class NewsSiteMapParserBolt extends SiteMapParserBolt { // - pass "isSitemapNews" to status metadata public static enum SitemapType { - NEWS, INDEX, SITEMAP + NEWS, + INDEX, + SITEMAP } public static final String isSitemapNewsKey = "isSitemapNews"; public static final String isSitemapIndexKey = "isSitemapIndex"; + /** - * A sitemap (not necessarily a news sitemap) which is verified to contain links - * to news articles. Necessary to crawl news sites which provide a sitemap but - * neither a news feed or sitemap. + * A sitemap (not necessarily a news sitemap) which is verified to contain links to news + * articles. Necessary to crawl news sites which provide a sitemap but neither a news feed or + * sitemap. */ public static final String isSitemapVerifiedKey = "isSitemapVerified"; - private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(NewsSiteMapParserBolt.class); + private static final org.slf4j.Logger LOG = + LoggerFactory.getLogger(NewsSiteMapParserBolt.class); /* content clues for news sitemaps, sitemap indexes or any sitemaps */ public static String[][] contentClues; public static int contentCluesSitemapNewsMatchUpTo = -1; public static int contentCluesSitemapIndexMatchUpTo = -1; + static { int cluesSize = Namespace.NEWS.length + 1 + 1 + Namespace.SITEMAP_LEGACY.length; contentClues = new String[cluesSize][1]; @@ -127,7 +129,7 @@ public static enum SitemapType { private ReducedMetric averagedMetrics; - /** Delay in minutes used for scheduling sub-sitemaps **/ + /** Delay in minutes used for scheduling sub-sitemaps * */ private int scheduleSitemapsWithDelay = -1; @Override @@ -184,7 +186,10 @@ public void execute(Tuple tuple) { metadata.remove(isSitemapKey); } else { if (isSitemap) { - collector.emit(Constants.StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED)); + collector.emit( + Constants.StatusStreamName, + tuple, + new Values(url, metadata, Status.FETCHED)); } else { // not a sitemap, just pass it on collector.emit(tuple, tuple.getValues()); @@ -210,7 +215,8 @@ public void execute(Tuple tuple) { metadata.setValue(Constants.STATUS_ERROR_SOURCE, "sitemap parsing"); metadata.setValue(Constants.STATUS_ERROR_MESSAGE, errorMessage); metadata.remove("numLinks"); - collector.emit(Constants.StatusStreamName, tuple, new Values(url, metadata, Status.ERROR)); + collector.emit( + Constants.StatusStreamName, tuple, new Values(url, metadata, Status.ERROR)); collector.ack(tuple); return; } @@ -260,7 +266,8 @@ public void execute(Tuple tuple) { metadata.setValue("numLinks", String.valueOf(outlinks.size())); // marking the main URL as successfully fetched - collector.emit(Constants.StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED)); + collector.emit( + Constants.StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED)); collector.ack(tuple); } @@ -307,7 +314,8 @@ protected AbstractSiteMap parseSiteMap( byte[] content, String contentType, Metadata parentMetadata, - List links) throws UnknownFormatException, IOException { + List links) + throws UnknownFormatException, IOException { SiteMapParser parser = new SiteMapParser(strictModeSitemaps, allowPartialSitemaps); parser.setStrictNamespace(true); @@ -357,14 +365,15 @@ protected AbstractSiteMap parseSiteMap( continue; } - Outlink ol = filterOutlink( - sURL, - target, - parentMetadata, - isSitemapKey, - "true", - isSitemapNewsKey, - "false"); + Outlink ol = + filterOutlink( + sURL, + target, + parentMetadata, + isSitemapKey, + "true", + isSitemapNewsKey, + "false"); if (ol == null) { continue; } @@ -372,7 +381,8 @@ protected AbstractSiteMap parseSiteMap( // add a delay if (this.scheduleSitemapsWithDelay > 0) { if (delay > 0) { - ol.getMetadata().setValue(DefaultScheduler.DELAY_METADATA, Integer.toString(delay)); + ol.getMetadata() + .setValue(DefaultScheduler.DELAY_METADATA, Integer.toString(delay)); } delay += this.scheduleSitemapsWithDelay; } @@ -391,7 +401,8 @@ protected AbstractSiteMap parseSiteMap( SiteMap sm = (SiteMap) siteMap; Collection sitemapURLs = sm.getSiteMapUrls(); Iterator iter = sitemapURLs.iterator(); - sitemap_urls : while (iter.hasNext()) { + sitemap_urls: + while (iter.hasNext()) { linksFound++; SiteMapURL smurl = iter.next(); // TODO handle priority in metadata @@ -447,28 +458,30 @@ protected AbstractSiteMap parseSiteMap( // skip href links duplicating sitemap URL continue; } - Outlink ol = filterOutlink( - sURL, - hrefStr, - parentMetadata, - isSitemapKey, - "false", - isSitemapNewsKey, - "false"); + Outlink ol = + filterOutlink( + sURL, + hrefStr, + parentMetadata, + isSitemapKey, + "false", + isSitemapNewsKey, + "false"); if (ol != null) { links.add(ol); } } } - Outlink ol = filterOutlink( - sURL, - target, - parentMetadata, - isSitemapKey, - "false", - isSitemapNewsKey, - "false"); + Outlink ol = + filterOutlink( + sURL, + target, + parentMetadata, + isSitemapKey, + "false", + isSitemapNewsKey, + "false"); if (ol == null) { continue; } @@ -476,7 +489,11 @@ protected AbstractSiteMap parseSiteMap( links.add(ol); LOG.debug("{} : [sitemap] {}", url, target); } - LOG.info("Sitemap (found {} links, {} skipped): {}", linksFound, linksSkippedNotRecentlyModified, url); + LOG.info( + "Sitemap (found {} links, {} skipped): {}", + linksFound, + linksSkippedNotRecentlyModified, + url); } return siteMap; @@ -487,16 +504,18 @@ protected AbstractSiteMap parseSiteMap( public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { super.prepare(stormConf, context, collector); sniffContent = ConfUtils.getBoolean(stormConf, "sitemap.sniffContent", false); - filterHoursSinceModified = ConfUtils.getInt(stormConf, "sitemap.filter.hours.since.modified", -1); + filterHoursSinceModified = + ConfUtils.getInt(stormConf, "sitemap.filter.hours.since.modified", -1); parseFilters = ParseFilters.fromConf(stormConf); int maxOffsetGuess = ConfUtils.getInt(stormConf, "sitemap.offset.guess", 1024); contentDetector = new ContentDetector(NewsSiteMapParserBolt.contentClues, maxOffsetGuess); rssContentDetector = new ContentDetector(FeedDetectorBolt.contentClues, maxOffsetGuess); - averagedMetrics = context.registerMetric( - "news_sitemap_average_processing_time", - new ReducedMetric(new MeanReducer()), - 30); - scheduleSitemapsWithDelay = ConfUtils.getInt(stormConf, "sitemap.schedule.delay", scheduleSitemapsWithDelay); + averagedMetrics = + context.registerMetric( + "news_sitemap_average_processing_time", + new ReducedMetric(new MeanReducer()), + 30); + scheduleSitemapsWithDelay = + ConfUtils.getInt(stormConf, "sitemap.schedule.delay", scheduleSitemapsWithDelay); } - } diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/PreFilterBolt.java b/src/main/java/org/commoncrawl/stormcrawler/news/PreFilterBolt.java index d4289c3..c068f94 100644 --- a/src/main/java/org/commoncrawl/stormcrawler/news/PreFilterBolt.java +++ b/src/main/java/org/commoncrawl/stormcrawler/news/PreFilterBolt.java @@ -3,7 +3,6 @@ import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.Map; - import org.apache.commons.lang3.StringUtils; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; @@ -19,13 +18,13 @@ import org.slf4j.LoggerFactory; /** - * Variant of the URLFilterBolt to go upstream of the fetching to catch anything - * before it goes further into the topology. If filtered, a URL gets an ERROR - * status. + * Variant of the URLFilterBolt to go upstream of the fetching to catch anything before it goes + * further into the topology. If filtered, a URL gets an ERROR status. */ public class PreFilterBolt extends BaseRichBolt { - protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + protected static final Logger LOG = + LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private URLFilters urlFilters; @@ -82,5 +81,4 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll urlFilters = URLFilters.fromConf(stormConf); } } - } diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/PunycodeURLNormalizer.java b/src/main/java/org/commoncrawl/stormcrawler/news/PunycodeURLNormalizer.java index b192412..114477c 100644 --- a/src/main/java/org/commoncrawl/stormcrawler/news/PunycodeURLNormalizer.java +++ b/src/main/java/org/commoncrawl/stormcrawler/news/PunycodeURLNormalizer.java @@ -13,20 +13,18 @@ */ package org.commoncrawl.stormcrawler.news; +import com.fasterxml.jackson.databind.JsonNode; import java.net.IDN; import java.net.MalformedURLException; import java.net.URL; import java.util.Map; - import org.apache.stormcrawler.Metadata; import org.apache.stormcrawler.filtering.URLFilter; -import com.fasterxml.jackson.databind.JsonNode; public class PunycodeURLNormalizer extends URLFilter { @Override - public void configure(Map stormConf, JsonNode filterParams) { - } + public void configure(Map stormConf, JsonNode filterParams) {} private boolean isAscii(String str) { char[] chars = str.toCharArray(); @@ -50,11 +48,11 @@ public String filter(URL sourceUrl, Metadata sourceMetadata, String urlToFilter) if (hostName.equals(url.getHost())) { return urlToFilter; } - urlToFilter = new URL(url.getProtocol(), hostName, url.getPort(), url.getFile()).toString(); + urlToFilter = + new URL(url.getProtocol(), hostName, url.getPort(), url.getFile()).toString(); } catch (MalformedURLException e) { return null; } return urlToFilter; } - } diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/BootstrapTopology.java b/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/BootstrapTopology.java index d870bdb..5b46acf 100644 --- a/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/BootstrapTopology.java +++ b/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/BootstrapTopology.java @@ -1,45 +1,39 @@ /** - * Licensed to DigitalPebble Ltd under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * DigitalPebble licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * Licensed to DigitalPebble Ltd under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. + * DigitalPebble licenses this file to You under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. You may obtain a copy of the + * License at * - * http://www.apache.org/licenses/LICENSE-2.0 + *

http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and * limitations under the License. */ - package org.commoncrawl.stormcrawler.news.bootstrap; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; -import org.commoncrawl.stormcrawler.news.CrawlTopology; -import org.commoncrawl.stormcrawler.news.FeedDetectorBolt; -import org.slf4j.LoggerFactory; - import org.apache.stormcrawler.ConfigurableTopology; import org.apache.stormcrawler.Constants; import org.apache.stormcrawler.bolt.FetcherBolt; import org.apache.stormcrawler.bolt.JSoupParserBolt; import org.apache.stormcrawler.bolt.URLFilterBolt; import org.apache.stormcrawler.bolt.URLPartitionerBolt; +import org.apache.stormcrawler.indexing.DummyIndexer; import org.apache.stormcrawler.opensearch.persistence.AggregationSpout; import org.apache.stormcrawler.opensearch.persistence.StatusUpdaterBolt; -import org.apache.stormcrawler.indexing.DummyIndexer; import org.apache.stormcrawler.spout.FileSpout; import org.apache.stormcrawler.util.ConfUtils; import org.apache.stormcrawler.util.URLStreamGrouping; import org.apache.stormcrawler.warc.WARCHdfsBolt; +import org.commoncrawl.stormcrawler.news.CrawlTopology; +import org.commoncrawl.stormcrawler.news.FeedDetectorBolt; +import org.slf4j.LoggerFactory; -/** - * Dummy topology to play with the spouts and bolts on ElasticSearch - */ +/** Dummy topology to play with the spouts and bolts on ElasticSearch */ public class BootstrapTopology extends CrawlTopology { private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(BootstrapTopology.class); @@ -52,9 +46,15 @@ public static void main(String[] args) throws Exception { protected int run(String[] args) { TopologyBuilder builder = new TopologyBuilder(); - LOG.debug("sitemap.sniffContent: {}", ConfUtils.getBoolean(getConf(), "sitemap.sniffContent", false)); - LOG.info("sitemap.sniffContent: {}", ConfUtils.getBoolean(getConf(), "sitemap.sniffContent", false)); - LOG.warn("sitemap.sniffContent: {}", ConfUtils.getBoolean(getConf(), "sitemap.sniffContent", false)); + LOG.debug( + "sitemap.sniffContent: {}", + ConfUtils.getBoolean(getConf(), "sitemap.sniffContent", false)); + LOG.info( + "sitemap.sniffContent: {}", + ConfUtils.getBoolean(getConf(), "sitemap.sniffContent", false)); + LOG.warn( + "sitemap.sniffContent: {}", + ConfUtils.getBoolean(getConf(), "sitemap.sniffContent", false)); int numWorkers = ConfUtils.getInt(getConf(), "topology.workers", 1); @@ -68,18 +68,23 @@ protected int run(String[] args) { builder.setSpout("filespout", new FileSpout(args[0], args[1], true)); Fields key = new Fields("url"); - builder.setBolt("filter", new URLFilterBolt()).fieldsGrouping("filespout", Constants.StatusStreamName, key); + builder.setBolt("filter", new URLFilterBolt()) + .fieldsGrouping("filespout", Constants.StatusStreamName, key); } builder.setSpout("spout", new AggregationSpout(), numShards); - builder.setBolt("partitioner", new URLPartitionerBolt(), numWorkers).shuffleGrouping("spout"); + builder.setBolt("partitioner", new URLPartitionerBolt(), numWorkers) + .shuffleGrouping("spout"); - builder.setBolt("fetch", new FetcherBolt(), numWorkers).fieldsGrouping("partitioner", new Fields("key")); + builder.setBolt("fetch", new FetcherBolt(), numWorkers) + .fieldsGrouping("partitioner", new Fields("key")); - builder.setBolt("sitemap", new NewsSiteMapDetectorBolt(), numWorkers).localOrShuffleGrouping("fetch"); + builder.setBolt("sitemap", new NewsSiteMapDetectorBolt(), numWorkers) + .localOrShuffleGrouping("fetch"); - builder.setBolt("feed", new FeedDetectorBolt(), numWorkers).localOrShuffleGrouping("sitemap"); + builder.setBolt("feed", new FeedDetectorBolt(), numWorkers) + .localOrShuffleGrouping("sitemap"); builder.setBolt("parse", new JSoupParserBolt()).localOrShuffleGrouping("feed"); diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/FeedLinkParseFilter.java b/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/FeedLinkParseFilter.java index 6ac664d..bb50291 100644 --- a/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/FeedLinkParseFilter.java +++ b/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/FeedLinkParseFilter.java @@ -14,20 +14,18 @@ package org.commoncrawl.stormcrawler.news.bootstrap; import java.util.ArrayList; - -import org.slf4j.LoggerFactory; -import org.w3c.dom.DocumentFragment; - import org.apache.stormcrawler.bolt.FeedParserBolt; import org.apache.stormcrawler.parse.Outlink; import org.apache.stormcrawler.parse.ParseResult; import org.apache.stormcrawler.parse.filter.LinkParseFilter; +import org.slf4j.LoggerFactory; +import org.w3c.dom.DocumentFragment; /** - * ParseFilter which extracts exclusively RSS links via Xpath, all other links - * are skipped. See {@link LinkParseFilter} how to register and configure in - * parsefilters.json. A configuration snippet: - * + * ParseFilter which extracts exclusively RSS links via Xpath, all other links are skipped. See + * {@link LinkParseFilter} how to register and configure in parsefilters.json. A configuration + * snippet: + * *

  *     {
  *      "class": "org.commoncrawl.stormcrawler.news.bootstrap.FeedLinkParseFilter",
@@ -59,11 +57,8 @@ public void filter(String URL, byte[] content, DocumentFragment doc, ParseResult
 
     public static void logLinks(ParseResult parse, String URL, String message) {
         if (LOG.isDebugEnabled() && parse.getOutlinks().size() > 0) {
-            if (!message.isEmpty())
-                LOG.debug("{} for {}:", message, URL);
-            for (Outlink outlink : parse.getOutlinks())
-                LOG.debug(outlink.getTargetURL());
+            if (!message.isEmpty()) LOG.debug("{} for {}:", message, URL);
+            for (Outlink outlink : parse.getOutlinks()) LOG.debug(outlink.getTargetURL());
         }
     }
-
 }
diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/NewsSiteMapDetectorBolt.java b/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/NewsSiteMapDetectorBolt.java
index 2e1011b..f0af134 100644
--- a/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/NewsSiteMapDetectorBolt.java
+++ b/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/NewsSiteMapDetectorBolt.java
@@ -14,15 +14,10 @@
 package org.commoncrawl.stormcrawler.news.bootstrap;
 
 import java.util.Map;
-
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
-import org.commoncrawl.stormcrawler.news.ContentDetector;
-import org.commoncrawl.stormcrawler.news.NewsSiteMapParserBolt;
-import org.slf4j.LoggerFactory;
-
 import org.apache.stormcrawler.Constants;
 import org.apache.stormcrawler.Metadata;
 import org.apache.stormcrawler.bolt.SiteMapParserBolt;
@@ -31,20 +26,23 @@
 import org.apache.stormcrawler.parse.ParseFilters;
 import org.apache.stormcrawler.parse.ParseResult;
 import org.apache.stormcrawler.persistence.Status;
+import org.commoncrawl.stormcrawler.news.ContentDetector;
+import org.commoncrawl.stormcrawler.news.NewsSiteMapParserBolt;
+import org.slf4j.LoggerFactory;
 
 /**
- * Detector for news
+ * Detector for news
  * sitemaps and also sitemaps.
  */
 @SuppressWarnings("serial")
 public class NewsSiteMapDetectorBolt extends SiteMapParserBolt {
 
-    private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(NewsSiteMapDetectorBolt.class);
+    private static final org.slf4j.Logger LOG =
+            LoggerFactory.getLogger(NewsSiteMapDetectorBolt.class);
 
     protected static final int maxOffsetContentGuess = 1024;
-    private static ContentDetector contentDetector = new ContentDetector(NewsSiteMapParserBolt.contentClues,
-            maxOffsetContentGuess);
+    private static ContentDetector contentDetector =
+            new ContentDetector(NewsSiteMapParserBolt.contentClues, maxOffsetContentGuess);
 
     private ParseFilter parseFilters;
 
@@ -56,7 +54,8 @@ public void execute(Tuple tuple) {
         String url = tuple.getStringByField("url");
 
         boolean isSitemap = Boolean.valueOf(metadata.getFirstValue(SiteMapParserBolt.isSitemapKey));
-        boolean isNewsSitemap = Boolean.valueOf(metadata.getFirstValue(NewsSiteMapParserBolt.isSitemapNewsKey));
+        boolean isNewsSitemap =
+                Boolean.valueOf(metadata.getFirstValue(NewsSiteMapParserBolt.isSitemapNewsKey));
 
         if (!isNewsSitemap || !isSitemap) {
             int match = contentDetector.getFirstMatch(content);
@@ -79,7 +78,8 @@ public void execute(Tuple tuple) {
             parseData.setMetadata(metadata);
             parseFilters.filter(url, content, null, parse);
             // emit status
-            collector.emit(Constants.StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED));
+            collector.emit(
+                    Constants.StatusStreamName, tuple, new Values(url, metadata, Status.FETCHED));
         } else {
             // pass on
             collector.emit(tuple, tuple.getValues());
@@ -93,5 +93,4 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll
         super.prepare(stormConf, context, collect);
         parseFilters = ParseFilters.fromConf(stormConf);
     }
-
 }
diff --git a/src/test/java/org/commoncrawl/stormcrawler/FastURLFilterTest.java b/src/test/java/org/commoncrawl/stormcrawler/FastURLFilterTest.java
index aa05f36..1931aca 100644
--- a/src/test/java/org/commoncrawl/stormcrawler/FastURLFilterTest.java
+++ b/src/test/java/org/commoncrawl/stormcrawler/FastURLFilterTest.java
@@ -14,21 +14,19 @@
  */
 package org.commoncrawl.stormcrawler;
 
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.HashMap;
 import java.util.Map;
-
+import org.apache.stormcrawler.Metadata;
+import org.apache.stormcrawler.filtering.URLFilter;
 import org.commoncrawl.stormcrawler.filter.FastURLFilter;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import org.apache.stormcrawler.Metadata;
-import org.apache.stormcrawler.filtering.URLFilter;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
 public class FastURLFilterTest {
 
     protected static URLFilter filter;
diff --git a/src/test/java/org/commoncrawl/stormcrawler/news/NewsSiteMapParserTest.java b/src/test/java/org/commoncrawl/stormcrawler/news/NewsSiteMapParserTest.java
index db0fedd..b0a0d5a 100644
--- a/src/test/java/org/commoncrawl/stormcrawler/news/NewsSiteMapParserTest.java
+++ b/src/test/java/org/commoncrawl/stormcrawler/news/NewsSiteMapParserTest.java
@@ -16,6 +16,7 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 
+import crawlercommons.sitemaps.UnknownFormatException;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -25,17 +26,13 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.commons.io.IOUtils;
-import org.commoncrawl.stormcrawler.news.NewsSiteMapParserBolt.SitemapType;
-import org.junit.Before;
-import org.junit.Test;
-
 import org.apache.stormcrawler.Metadata;
 import org.apache.stormcrawler.parse.Outlink;
 import org.apache.stormcrawler.parse.ParsingTester;
-
-import crawlercommons.sitemaps.UnknownFormatException;
+import org.commoncrawl.stormcrawler.news.NewsSiteMapParserBolt.SitemapType;
+import org.junit.Before;
+import org.junit.Test;
 
 public class NewsSiteMapParserTest extends ParsingTester {
 
@@ -60,7 +57,8 @@ public void testSiteMapParser() throws IOException, UnknownFormatException {
         SitemapType type = ((NewsSiteMapParserBolt) bolt).detectContent(url, content);
         assertEquals(SitemapType.NEWS, type);
 
-        ((NewsSiteMapParserBolt) bolt).parseSiteMap(url, content, contentType, parentMetadata, links);
+        ((NewsSiteMapParserBolt) bolt)
+                .parseSiteMap(url, content, contentType, parentMetadata, links);
 
         // unmodified sitemap:
         // - publication date is far in the past, link should be skipped
@@ -69,14 +67,17 @@ public void testSiteMapParser() throws IOException, UnknownFormatException {
 
         // now set the publication date to yesterday
         LocalDateTime yesterday = LocalDateTime.now().minusDays(1);
-        content = (new String(content, StandardCharsets.UTF_8))
-                .replace(
-                        "2008-12-23",
-                        ""
-                                + yesterday.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))
-                                + "")
-                .getBytes(StandardCharsets.UTF_8);
-        ((NewsSiteMapParserBolt) bolt).parseSiteMap(url, content, contentType, parentMetadata, links);
+        content =
+                (new String(content, StandardCharsets.UTF_8))
+                        .replace(
+                                "2008-12-23",
+                                ""
+                                        + yesterday.format(
+                                                DateTimeFormatter.ofPattern("yyyy-MM-dd"))
+                                        + "")
+                        .getBytes(StandardCharsets.UTF_8);
+        ((NewsSiteMapParserBolt) bolt)
+                .parseSiteMap(url, content, contentType, parentMetadata, links);
 
         assertEquals(
                 "Expected one  and one additional  link - image links are ignored",
@@ -95,8 +96,13 @@ public void testFeedWithSitemapNamespace() throws IOException, UnknownFormatExce
         String url = "https://example.org/feed.xml";
         byte[] content = readContent("feed-with-sitemap-namespace.xml");
         SitemapType type = ((NewsSiteMapParserBolt) bolt).detectContent(url, content);
-        assertNotEquals("RSS feed with sitemap namespace should not be detected as sitemap", SitemapType.NEWS, type);
-        assertNotEquals("RSS feed with sitemap namespace should not be detected as sitemap", SitemapType.SITEMAP, type);
+        assertNotEquals(
+                "RSS feed with sitemap namespace should not be detected as sitemap",
+                SitemapType.NEWS,
+                type);
+        assertNotEquals(
+                "RSS feed with sitemap namespace should not be detected as sitemap",
+                SitemapType.SITEMAP,
+                type);
     }
-
 }

From 07287d744ee90ed3b12e7dec9531e237e5f59126 Mon Sep 17 00:00:00 2001
From: Luca Foppiano 
Date: Tue, 16 Jun 2026 18:22:42 +0100
Subject: [PATCH 08/10] chore: reformat license comments for consistency

Signed-off-by: Luca Foppiano 
---
 .../stormcrawler/filter/FastURLFilter.java    | 22 ++++++++++--------
 .../stormcrawler/news/CrawlTopology.java      | 22 ++++++++++--------
 .../news/bootstrap/BootstrapTopology.java     | 23 +++++++++++--------
 .../stormcrawler/FastURLFilterTest.java       |  2 +-
 4 files changed, 38 insertions(+), 31 deletions(-)

diff --git a/src/main/java/org/commoncrawl/stormcrawler/filter/FastURLFilter.java b/src/main/java/org/commoncrawl/stormcrawler/filter/FastURLFilter.java
index 0b2b04d..9b2ee32 100644
--- a/src/main/java/org/commoncrawl/stormcrawler/filter/FastURLFilter.java
+++ b/src/main/java/org/commoncrawl/stormcrawler/filter/FastURLFilter.java
@@ -1,15 +1,17 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * 

http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - *

Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and * limitations under the License. */ package org.commoncrawl.stormcrawler.filter; diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java b/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java index 9368d2c..cc2d85a 100644 --- a/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java +++ b/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java @@ -1,15 +1,17 @@ -/** - * Licensed to DigitalPebble Ltd under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. - * DigitalPebble licenses this file to You under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. You may obtain a copy of the - * License at +/* + * Licensed to DigitalPebble Ltd under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * DigitalPebble licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - *

http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - *

Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and * limitations under the License. */ package org.commoncrawl.stormcrawler.news; diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/BootstrapTopology.java b/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/BootstrapTopology.java index 5b46acf..14f45ff 100644 --- a/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/BootstrapTopology.java +++ b/src/main/java/org/commoncrawl/stormcrawler/news/bootstrap/BootstrapTopology.java @@ -1,17 +1,20 @@ -/** - * Licensed to DigitalPebble Ltd under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. - * DigitalPebble licenses this file to You under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. You may obtain a copy of the - * License at +/* + * Licensed to DigitalPebble Ltd under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * DigitalPebble licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - *

http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - *

Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and * limitations under the License. */ + package org.commoncrawl.stormcrawler.news.bootstrap; import org.apache.storm.topology.TopologyBuilder; diff --git a/src/test/java/org/commoncrawl/stormcrawler/FastURLFilterTest.java b/src/test/java/org/commoncrawl/stormcrawler/FastURLFilterTest.java index 1931aca..58aaa04 100644 --- a/src/test/java/org/commoncrawl/stormcrawler/FastURLFilterTest.java +++ b/src/test/java/org/commoncrawl/stormcrawler/FastURLFilterTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to DigitalPebble Ltd under one or more contributor license agreements. See the NOTICE * file distributed with this work for additional information regarding copyright ownership. * DigitalPebble licenses this file to You under the Apache License, Version 2.0 (the "License"); From 0d0606ddfbfeea9348393e3ff8f2e33495d704ba Mon Sep 17 00:00:00 2001 From: Luca Foppiano Date: Tue, 16 Jun 2026 18:23:40 +0100 Subject: [PATCH 09/10] chore: add missing licence header Signed-off-by: Luca Foppiano --- .../stormcrawler/news/PreFilterBolt.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/PreFilterBolt.java b/src/main/java/org/commoncrawl/stormcrawler/news/PreFilterBolt.java index c068f94..18106c3 100644 --- a/src/main/java/org/commoncrawl/stormcrawler/news/PreFilterBolt.java +++ b/src/main/java/org/commoncrawl/stormcrawler/news/PreFilterBolt.java @@ -1,3 +1,17 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.commoncrawl.stormcrawler.news; import java.io.IOException; From 3d13c257314198088e6be0ad01b417d79684fbb1 Mon Sep 17 00:00:00 2001 From: Luca Foppiano Date: Tue, 16 Jun 2026 18:28:17 +0100 Subject: [PATCH 10/10] docs: add developer formatting instructions to README Signed-off-by: Luca Foppiano --- README.md | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/README.md b/README.md index 6ac88e3..874da21 100644 --- a/README.md +++ b/README.md @@ -119,3 +119,18 @@ NewsCrawl ACTIVE 48 1 146 NewsCrawl-1 $> storm kill NewsCrawl ``` +## Note for developers + +Please format your code before submitting a PR with + +``` +mvn git-code-format:format-code -Dgcf.globPattern="**/*" -Dskip.format.code=false +``` + +You can enable pre-commit format hooks by running: + +``` +mvn clean install -Dskip.format.code=false +``` + +