Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
286 changes: 177 additions & 109 deletions src/java/org/apache/nutch/crawl/SitemapInjector.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.commons.lang3.time.StopWatch;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -75,9 +76,12 @@
import crawlercommons.sitemaps.SiteMapIndex;
import crawlercommons.sitemaps.SiteMapParser;
import crawlercommons.sitemaps.SiteMapURL;
import crawlercommons.sitemaps.extension.Extension;
import crawlercommons.sitemaps.extension.ExtensionMetadata;
import crawlercommons.sitemaps.extension.LinkAttributes;

/**
* Inject URLs from sitemaps (http://www.sitemaps.org/).
* Inject URLs from sitemaps (https://www.sitemaps.org/).
*
* Sitemap URLs are given same way as "ordinary" seeds URLs - one URL per line.
* Each URL points to one of
Expand All @@ -86,16 +90,16 @@
* <li>plain text sitemap (possibly compressed)</li>
* <li>sitemap index (XML)</li>
* <li>and all
* <a href="http://www.sitemaps.org/protocol.html#otherformats">other
* formats</a> supported by the Sitemap parser of
* <a href="http://code.google.com/p/crawler-commons/">crawler-commons</a>.</li>
* <a href="https://www.sitemaps.org/protocol.html#otherformats">other
* formats</a> supported by the Sitemap parser of <a href=
* "https://github.com/crawler-commons/crawler-commons/">crawler-commons</a>.</li>
* </ul>
*
* <p>
* All sitemap URLs on the input path are fetched and the URLs contained in the
* sitemaps are "injected" into CrawlDb. If a sitemap specifies modification
* sitemaps are "injected" into the CrawlDb. If a sitemap specifies modification
* time, refresh rate, and/or priority for a page, these values are stored in
* CrawlDb but adjusted so that they fit into global limits. E.g.,
* the CrawlDb but adjusted so that they fit into global limits. E.g.,
*
* <pre>
* &lt;changefreq&gt;yearly&lt;/changefreq&gt;
Expand All @@ -113,24 +117,44 @@
* Fetching sitemaps is done by Nutch protocol plugins to make use of special
* settings, e.g., HTTP proxy configurations.
*
* The behavior how entries in CrawlDb are overwritten by injected entries does
* not differ from {@link Injector}.
* The behavior how entries in the CrawlDb are overwritten by injected entries
* does not differ from {@link Injector}. However, it is possible to run
* SitemapInjector in two steps:
* <ol>
* <li>Step 1: Extract URLs from sitemaps, store the URLs in a new CrawlDb.</li>
* <li>Step 2: Inject URLs from the CrawlDb created in Step 1 into another
* CrawlDb.</li>
* </ol>
*
* <h2>Specifics and Limitations</h2>
*
* <h2>Limitations</h2>
*
* <p>
* SitemapInjector does not support:
* SitemapInjector does <b>not</b> support:
* <ul>
* <li>[done/implemented] follow redirects</li>
* <li>no retry scheduling if fetching a sitemap fails</li>
* <li>be polite and add delays between fetching sitemaps. Usually, there is
* only one sitemap per host, so this does not matter that much.</li>
* <li>[done/implemented] check for
* &quot;<a href="http://www.sitemaps.org/protocol.html#location">cross
* <li>Retry scheduling if fetching a sitemap fails.</li>
* <li>Guarantee polite delays between fetching sitemaps from the same host.
* Usually, there is only one sitemap per host, so this does not matter that
* much. But it should be made sure that the input list of sitemap URLs does not
* contain multiple or many sitemaps from hosted on the same system.</li>
* </ul>
*
* The following features are implemented:
* <ul>
* <li>Respect robots.txt rules: do not access sitemaps disallowed per
* robots.txt</li>
* <li>Apply URL filters and normalization rules to URLs of sitemaps and URLs
* listed in sitemaps.</li>
* <li>Follow redirects.</li>
* <li>Check for
* &quot;<a href="https://www.sitemaps.org/protocol.html#location">cross
* submits</a>&quot;: if a sitemap URL is explicitly given it is assumed the
* sitemap's content is trustworthy</li>
* sitemap's content is trustworthy.</li>
* <li>Configure multiple limits on sitemap fetching and processing, to avoid
* that the sitemap processing is overloaded, get stuck, or too many URLs are
* emitted. See
* {@link SitemapInjector.SitemapInjectMapper.SitemapProcessor#processSitemap(AbstractSiteMap, Set, int)}
* for more details.</li>
* </ul>
* </p>
*
*/
public class SitemapInjector extends Injector {

Expand Down Expand Up @@ -194,12 +218,14 @@ public void setup(Context context) {

protocolFactory = new ProtocolFactory(conf);

// SiteMapParser to allow "cross submits" from different prefixes
// (up to last slash), cf. http://www.sitemaps.org/protocol.html#location
// strict = true : do not allow cross submits.
// This would need to pass a set of cross-submit allowed hosts beforehand
// which is not supported by the sitemap parser. Done in SitemapInjector,
// see below.
/*
* SiteMapParser to allow "cross submits" from different prefixes (up to
* the last slash), cf. https://www.sitemaps.org/protocol.html#location.
*
* strict = true : do not allow cross submits. This would need to pass a
* set of cross-submit allowed hosts beforehand which is not supported by
* the sitemap parser. Done in SitemapInjector, see below.
*/
boolean strict = conf.getBoolean("db.injector.sitemap.strict", false);
sitemapParser = new SiteMapParser(strict, true);
sitemapParser.setStrictNamespace(true);
Expand All @@ -209,6 +235,8 @@ public void setup(Context context) {
.addAcceptedNamespace(crawlercommons.sitemaps.Namespace.NEWS);
sitemapParser
.addAcceptedNamespace(crawlercommons.sitemaps.Namespace.EMPTY);
// enable support for localized links in sitemaps
sitemapParser.enableExtension(Extension.LINKS);

maxRecursiveSitemaps = conf.getInt("db.injector.sitemap.index_max_size",
50001);
Expand All @@ -223,15 +251,24 @@ public void setup(Context context) {
checkCrossSubmitsType = CrossSubmitType
.valueOf(conf.get(SITEMAP_CROSS_SUBMIT_CHECK_TYPE, "PRIVATE_DOMAIN"));

// make sure a sitemap is entirely, even recursively processed within 80%
// of the task timeout, do not start processing a subsitemap if fetch
// and parsing time may hit the task timeout
int taskTimeout = conf.getInt("mapreduce.task.timeout", 900000) / 1000;
/*
* Make sure a sitemap is entirely, even recursively processed within 80%
* of the task timeout, do not start processing a subsitemap if fetch and
* parsing time may hit the task timeout
*/
int taskTimeout = conf.getInt("mapreduce.task.timeout", 900000);
LOG.info("mapreduce.task.timeout = {} ms", taskTimeout);
taskTimeout /= 1000; // now in seconds
LOG.info("http.time.limit = {} seconds",
conf.getInt("http.time.limit", 120));
maxSitemapFetchTime = (int) (conf.getInt("http.time.limit", 120) * 1.5);
maxSitemapProcessingTime = taskTimeout - (2 * maxSitemapFetchTime);
if ((taskTimeout * .8) < maxSitemapProcessingTime) {
if ((taskTimeout * .8) < maxSitemapProcessingTime
|| maxSitemapProcessingTime < 1) {
maxSitemapProcessingTime = (int) (taskTimeout * .8);
}
LOG.info("Max. sitemap processing time: {} seconds",
maxSitemapProcessingTime);
maxFailuresPerHost = conf
.getInt("db.injector.sitemap.max.fetch.failures.per.host", 5);

Expand All @@ -246,9 +283,11 @@ public void setup(Context context) {
maxInterval = conf.getInt("db.fetch.interval.max", 365 * 24 * 3600);
}

// Sitemaps can be quite large, so it is desirable to
// increase the content limits above defaults (64kB):
// TODO: make configurable?
/*
* Sitemaps can be quite large, so it is desirable to increase the content
* limits above defaults (1 MiB) to the 50 MiB specified in the sitemaps
* protocol:
*/
String[] contentLimitProperties = { "http.content.limit",
"ftp.content.limit", "file.content.limit" };
for (int i = 0; i < contentLimitProperties.length; i++) {
Expand Down Expand Up @@ -861,8 +900,9 @@ public void injectURLs(SiteMap sitemap)
}
}

int crossSubmitsRejected = 0;
int hostLimitRejected = 0;
AtomicLong crossSubmitsRejected = new AtomicLong(0);
AtomicLong hostLimitRejected = new AtomicLong(0);

for (SiteMapURL siteMapURL : sitemapURLs) {

if (totalUrls >= maxUrls) {
Expand All @@ -881,7 +921,7 @@ public void injectURLs(SiteMap sitemap)
}
}

// TODO: score and fetch interval should be transparently overridable
// TODO: score and fetch interval should be transparently overridden
float sitemapScore = (float) siteMapURL.getPriority();
sitemapScore *= customScore;
int sitemapInterval = getChangeFrequencySeconds(
Expand All @@ -890,91 +930,119 @@ public void injectURLs(SiteMap sitemap)
if (siteMapURL.getLastModified() != null) {
lastModified = siteMapURL.getLastModified().getTime();
}
URL u = siteMapURL.getUrl();
String url = u.toString();
if (url.length() > maxUrlLength) {
LOG.warn(
"Skipping overlong URL: {} ... (truncated, length = {} characters)",
url.substring(0, maxUrlLength), url.length());
continue;
}
// for simplicity do host and domain checks before normalization
String host = u.getHost();
if (injectedHosts.size() >= maxHosts
&& !injectedHosts.contains(host)) {
hostLimitRejected++;
context
.getCounter(NutchMetrics.GROUP_SITEMAP_INJECTOR,
NutchMetrics.SITEMAP_URLS_SKIPPED_HOST_LIMIT_REACHED_TOTAL)
.increment(1);
continue;
}
if (checkCrossSubmits) {
String crossSubmit = host;
if (checkCrossSubmitsType == CrossSubmitType.PRIVATE_DOMAIN) {
crossSubmit = EffectiveTldFinder.getAssignedDomain(host, false,
false);
} else if (checkCrossSubmitsType == CrossSubmitType.PUBLIC_DOMAIN) {
crossSubmit = EffectiveTldFinder.getAssignedDomain(host, false,
true);
}
if (crossSubmit == null || !crossSubmits.contains(crossSubmit)) {
crossSubmitsRejected++;
context.getCounter(NutchMetrics.GROUP_SITEMAP_INJECTOR,
NutchMetrics.SITEMAP_URLS_SKIPPED_NOT_ALLOWED_BY_CROSS_SUBMITS_TOTAL)
.increment(1);
continue;
}
}
try {
url = filterNormalize(url);
} catch (Exception e) {
LOG.warn("Skipping {}:", url, e);
url = null;
}
if (url == null) {
context
.getCounter(NutchMetrics.GROUP_SITEMAP_INJECTOR,
NutchMetrics.SITEMAP_URLS_FROM_REJECTED_BY_URL_FILTERS)
.increment(1);
} else {
// URL passed normalizers and filters
totalUrls++;
Text value = new Text(url);
CrawlDatum datum = new CrawlDatum(CrawlDatum.STATUS_INJECTED,
sitemapInterval, sitemapScore);
if (lastModified != -1) {
// datum.setModifiedTime(lastModified);
}
datum.setFetchTime(curTime);

try {
scfilters.injectedScore(value, datum);
} catch (ScoringFilterException e) {
LOG.warn(
"Cannot filter injected score for url {}, using default ({})",
url, e.getMessage());
injectURL(siteMapURL.getUrl(), sitemapScore, sitemapInterval, lastModified,
crossSubmitsRejected, hostLimitRejected);

/*
* Inject localized links if there are any. See
* <https://developers.google.com/search/docs/specialty/international/localized-versions>
* and
* <https://crawler-commons.github.io/crawler-commons/1.6/crawlercommons/sitemaps/extension/LinkAttributes.html>
*/
ExtensionMetadata[] linkAttrs = siteMapURL
.getAttributesForExtension(Extension.LINKS);
if (linkAttrs != null) {
for (ExtensionMetadata attr : linkAttrs) {
LinkAttributes linkAttr = (LinkAttributes) attr;
URL href = linkAttr.getHref();
if (href != null) {
injectURL(href, sitemapScore, sitemapInterval, lastModified,
crossSubmitsRejected, hostLimitRejected);
context.getCounter(NutchMetrics.GROUP_SITEMAP_INJECTOR,
"sitemap_extension_localized_link").increment(1);
}
}

context.getCounter(NutchMetrics.GROUP_SITEMAP_INJECTOR,
NutchMetrics.SITEMAP_URLS_INJECTED).increment(1);
context.write(value, datum);
injectedHosts.add(host);
}

}
if (crossSubmitsRejected > 0) {
if (crossSubmitsRejected.get() > 0) {
LOG.info("Rejected {} cross-submits for {} ({})",
crossSubmitsRejected, sitemap.getUrl(),
crossSubmitsRejected.get(), sitemap.getUrl(),
sitemap.getType().toString());
context.getCounter(NutchMetrics.GROUP_SITEMAP_INJECTOR,
NutchMetrics.SITEMAP_URLS_SKIPPED_NOT_ALLOWED_BY_CROSS_SUBMITS_TOTAL)
.increment(crossSubmitsRejected.get());
}
if (hostLimitRejected > 0) {
if (hostLimitRejected.get() > 0) {
LOG.info(
"Rejected {} URLs because max. number of linked hosts is reached for {} ({})",
hostLimitRejected, sitemap.getUrl(),
hostLimitRejected.get(), sitemap.getUrl(),
sitemap.getType().toString());
context
.getCounter(NutchMetrics.GROUP_SITEMAP_INJECTOR,
NutchMetrics.SITEMAP_URLS_SKIPPED_HOST_LIMIT_REACHED_TOTAL)
.increment(hostLimitRejected.get());
}
}

public void injectURL(URL u, float sitemapScore, int sitemapInterval,
long lastModified, AtomicLong crossSubmitsRejected, AtomicLong hostLimitRejected) throws IOException, InterruptedException {
String url = u.toString();
if (url.length() > maxUrlLength) {
LOG.warn(
"Skipping overlong URL: {} ... (truncated, length = {} characters)",
url.substring(0, maxUrlLength), url.length());
return;
}

// for simplicity do host and domain checks before normalization
String host = u.getHost();
if (injectedHosts.size() >= maxHosts && !injectedHosts.contains(host)) {
hostLimitRejected.incrementAndGet();
return;
}

if (checkCrossSubmits) {
String crossSubmit = host;
if (checkCrossSubmitsType == CrossSubmitType.PRIVATE_DOMAIN) {
crossSubmit = EffectiveTldFinder.getAssignedDomain(host, false,
false);
} else if (checkCrossSubmitsType == CrossSubmitType.PUBLIC_DOMAIN) {
crossSubmit = EffectiveTldFinder.getAssignedDomain(host, false,
true);
}
if (crossSubmit == null || !crossSubmits.contains(crossSubmit)) {
crossSubmitsRejected.incrementAndGet();
return;
}
}
try {
url = filterNormalize(url);
} catch (Exception e) {
LOG.warn("Skipping {}:", url, e);
url = null;
}
if (url == null) {
context
.getCounter(NutchMetrics.GROUP_SITEMAP_INJECTOR,
NutchMetrics.SITEMAP_URLS_FROM_REJECTED_BY_URL_FILTERS)
.increment(1);
} else {
// URL passed normalizers and filters
totalUrls++;
Text value = new Text(url);
CrawlDatum datum = new CrawlDatum(CrawlDatum.STATUS_INJECTED,
sitemapInterval, sitemapScore);
if (lastModified != -1) {
// datum.setModifiedTime(lastModified);
}
datum.setFetchTime(curTime);

try {
scfilters.injectedScore(value, datum);
} catch (ScoringFilterException e) {
LOG.warn(
"Cannot filter injected score for url {}, using default ({})",
url, e.getMessage());
}

context.getCounter(NutchMetrics.GROUP_SITEMAP_INJECTOR,
NutchMetrics.SITEMAP_URLS_INJECTED).increment(1);
context.write(value, datum);
injectedHosts.add(host);
}
}
}

/**
Expand Down
Loading