From 63dafc185fd3f7daf94ceca00aa4eaff35bb5892 Mon Sep 17 00:00:00 2001 From: Julien Nioche Date: Wed, 13 Dec 2023 10:54:32 +0000 Subject: [PATCH 1/2] Have as many WARCBolt instances as there are workers, fix #64 Signed-off-by: Julien Nioche --- .../java/org/commoncrawl/stormcrawler/news/CrawlTopology.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java b/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java index 15f4074..8b7d54c 100644 --- a/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java +++ b/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java @@ -94,7 +94,7 @@ protected int run(String[] args) { // take it from feed default output so that the feed files themselves // don't get included - unless we want them too of course! - builder.setBolt("warc", warcbolt).localOrShuffleGrouping("feed"); + builder.setBolt("warc", warcbolt, numWorkers).localOrShuffleGrouping("feed"); BoltDeclarer statusBolt = builder.setBolt("status", new StatusUpdaterBolt(), numWorkers) .localOrShuffleGrouping("fetch", Constants.StatusStreamName) From efd0d2407acac5d7970cd915dbf114f76ccb024f Mon Sep 17 00:00:00 2001 From: Julien Nioche Date: Wed, 13 Dec 2023 11:23:15 +0000 Subject: [PATCH 2/2] Route tuples to the status updater bolt based on URLs,fixes #65 Signed-off-by: Julien Nioche --- conf/crawler.flux | 15 ++++++++++----- .../stormcrawler/news/CrawlTopology.java | 14 +++++++++----- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/conf/crawler.flux b/conf/crawler.flux index c116f0a..1d221c2 100644 --- a/conf/crawler.flux +++ b/conf/crawler.flux @@ -172,31 +172,36 @@ streams: - from: "prefilter" to: "status" grouping: - type: LOCAL_OR_SHUFFLE + type: FIELDS + args: ["url"] streamId: "status" - from: "fetcher" to: "status" grouping: - type: LOCAL_OR_SHUFFLE + type: FIELDS + args: ["url"] streamId: "status" - from: "sitemap" to: "status" grouping: - type: LOCAL_OR_SHUFFLE + type: FIELDS + args: ["url"] streamId: "status" - from: "feed" to: "status" grouping: - type: LOCAL_OR_SHUFFLE + type: FIELDS + args: ["url"] streamId: "status" - from: "ssbolt" to: "status" grouping: - type: LOCAL_OR_SHUFFLE + type: FIELDS + args: ["url"] streamId: "status" # part of the topology used to inject seeds diff --git a/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java b/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java index 8b7d54c..bfba4b7 100644 --- a/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java +++ b/src/main/java/org/commoncrawl/stormcrawler/news/CrawlTopology.java @@ -95,16 +95,20 @@ protected int run(String[] args) { // take it from feed default output so that the feed files themselves // don't get included - unless we want them too of course! builder.setBolt("warc", warcbolt, numWorkers).localOrShuffleGrouping("feed"); + + final Fields furl = new Fields("url"); BoltDeclarer statusBolt = builder.setBolt("status", new StatusUpdaterBolt(), numWorkers) - .localOrShuffleGrouping("fetch", Constants.StatusStreamName) - .localOrShuffleGrouping("sitemap", Constants.StatusStreamName) - .localOrShuffleGrouping("feed", Constants.StatusStreamName) - .localOrShuffleGrouping("ssb", Constants.StatusStreamName) - .localOrShuffleGrouping("prefilter", Constants.StatusStreamName).setNumTasks(numShards); + .fieldsGrouping("fetch", Constants.StatusStreamName, furl) + .fieldsGrouping("sitemap", Constants.StatusStreamName, furl) + .fieldsGrouping("feed", Constants.StatusStreamName, furl) + .fieldsGrouping("ssb", Constants.StatusStreamName, furl) + .fieldsGrouping("prefilter", Constants.StatusStreamName, furl); + if (args.length >= 2) { statusBolt.customGrouping("filter", Constants.StatusStreamName, new URLStreamGrouping()); } + statusBolt.setNumTasks(numShards); return submit(conf, builder); }