diff --git a/src/java/org/apache/nutch/crawl/SitemapInjector.java b/src/java/org/apache/nutch/crawl/SitemapInjector.java index b643e3368a..a8a9152afe 100644 --- a/src/java/org/apache/nutch/crawl/SitemapInjector.java +++ b/src/java/org/apache/nutch/crawl/SitemapInjector.java @@ -37,6 +37,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.time.StopWatch; import org.apache.hadoop.conf.Configuration; @@ -75,9 +76,12 @@ import crawlercommons.sitemaps.SiteMapIndex; import crawlercommons.sitemaps.SiteMapParser; import crawlercommons.sitemaps.SiteMapURL; +import crawlercommons.sitemaps.extension.Extension; +import crawlercommons.sitemaps.extension.ExtensionMetadata; +import crawlercommons.sitemaps.extension.LinkAttributes; /** - * Inject URLs from sitemaps (http://www.sitemaps.org/). + * Inject URLs from sitemaps (https://www.sitemaps.org/). * * Sitemap URLs are given same way as "ordinary" seeds URLs - one URL per line. * Each URL points to one of @@ -86,16 +90,16 @@ *
  • plain text sitemap (possibly compressed)
  • *
  • sitemap index (XML)
  • *
  • and all - * other - * formats supported by the Sitemap parser of - * crawler-commons.
  • + * other + * formats supported by the Sitemap parser of crawler-commons. * * *

    * All sitemap URLs on the input path are fetched and the URLs contained in the - * sitemaps are "injected" into CrawlDb. If a sitemap specifies modification + * sitemaps are "injected" into the CrawlDb. If a sitemap specifies modification * time, refresh rate, and/or priority for a page, these values are stored in - * CrawlDb but adjusted so that they fit into global limits. E.g., + * the CrawlDb but adjusted so that they fit into global limits. E.g., * *

      * <changefreq>yearly</changefreq>
    @@ -113,24 +117,44 @@
      * Fetching sitemaps is done by Nutch protocol plugins to make use of special
      * settings, e.g., HTTP proxy configurations.
      *
    - * The behavior how entries in CrawlDb are overwritten by injected entries does
    - * not differ from {@link Injector}.
    + * The behavior how entries in the CrawlDb are overwritten by injected entries
    + * does not differ from {@link Injector}. However, it is possible to run
    + * SitemapInjector in two steps:
    + * 
      + *
    1. Step 1: Extract URLs from sitemaps, store the URLs in a new CrawlDb.
    2. + *
    3. Step 2: Inject URLs from the CrawlDb created in Step 1 into another + * CrawlDb.
    4. + *
    + * + *

    Specifics and Limitations

    * - *

    Limitations

    - * - *

    - * SitemapInjector does not support: + * SitemapInjector does not support: *

    + * + * The following features are implemented: + * - *

    + * */ public class SitemapInjector extends Injector { @@ -194,12 +218,14 @@ public void setup(Context context) { protocolFactory = new ProtocolFactory(conf); - // SiteMapParser to allow "cross submits" from different prefixes - // (up to last slash), cf. http://www.sitemaps.org/protocol.html#location - // strict = true : do not allow cross submits. - // This would need to pass a set of cross-submit allowed hosts beforehand - // which is not supported by the sitemap parser. Done in SitemapInjector, - // see below. + /* + * SiteMapParser to allow "cross submits" from different prefixes (up to + * the last slash), cf. https://www.sitemaps.org/protocol.html#location. + * + * strict = true : do not allow cross submits. This would need to pass a + * set of cross-submit allowed hosts beforehand which is not supported by + * the sitemap parser. Done in SitemapInjector, see below. + */ boolean strict = conf.getBoolean("db.injector.sitemap.strict", false); sitemapParser = new SiteMapParser(strict, true); sitemapParser.setStrictNamespace(true); @@ -209,6 +235,8 @@ public void setup(Context context) { .addAcceptedNamespace(crawlercommons.sitemaps.Namespace.NEWS); sitemapParser .addAcceptedNamespace(crawlercommons.sitemaps.Namespace.EMPTY); + // enable support for localized links in sitemaps + sitemapParser.enableExtension(Extension.LINKS); maxRecursiveSitemaps = conf.getInt("db.injector.sitemap.index_max_size", 50001); @@ -223,15 +251,24 @@ public void setup(Context context) { checkCrossSubmitsType = CrossSubmitType .valueOf(conf.get(SITEMAP_CROSS_SUBMIT_CHECK_TYPE, "PRIVATE_DOMAIN")); - // make sure a sitemap is entirely, even recursively processed within 80% - // of the task timeout, do not start processing a subsitemap if fetch - // and parsing time may hit the task timeout - int taskTimeout = conf.getInt("mapreduce.task.timeout", 900000) / 1000; + /* + * Make sure a sitemap is entirely, even recursively processed within 80% + * of the task timeout, do not start processing a subsitemap if fetch and + * parsing time may hit the task timeout + */ + int taskTimeout = conf.getInt("mapreduce.task.timeout", 900000); + LOG.info("mapreduce.task.timeout = {} ms", taskTimeout); + taskTimeout /= 1000; // now in seconds + LOG.info("http.time.limit = {} seconds", + conf.getInt("http.time.limit", 120)); maxSitemapFetchTime = (int) (conf.getInt("http.time.limit", 120) * 1.5); maxSitemapProcessingTime = taskTimeout - (2 * maxSitemapFetchTime); - if ((taskTimeout * .8) < maxSitemapProcessingTime) { + if ((taskTimeout * .8) < maxSitemapProcessingTime + || maxSitemapProcessingTime < 1) { maxSitemapProcessingTime = (int) (taskTimeout * .8); } + LOG.info("Max. sitemap processing time: {} seconds", + maxSitemapProcessingTime); maxFailuresPerHost = conf .getInt("db.injector.sitemap.max.fetch.failures.per.host", 5); @@ -246,9 +283,11 @@ public void setup(Context context) { maxInterval = conf.getInt("db.fetch.interval.max", 365 * 24 * 3600); } - // Sitemaps can be quite large, so it is desirable to - // increase the content limits above defaults (64kB): - // TODO: make configurable? + /* + * Sitemaps can be quite large, so it is desirable to increase the content + * limits above defaults (1 MiB) to the 50 MiB specified in the sitemaps + * protocol: + */ String[] contentLimitProperties = { "http.content.limit", "ftp.content.limit", "file.content.limit" }; for (int i = 0; i < contentLimitProperties.length; i++) { @@ -861,8 +900,9 @@ public void injectURLs(SiteMap sitemap) } } - int crossSubmitsRejected = 0; - int hostLimitRejected = 0; + AtomicLong crossSubmitsRejected = new AtomicLong(0); + AtomicLong hostLimitRejected = new AtomicLong(0); + for (SiteMapURL siteMapURL : sitemapURLs) { if (totalUrls >= maxUrls) { @@ -881,7 +921,7 @@ public void injectURLs(SiteMap sitemap) } } - // TODO: score and fetch interval should be transparently overridable + // TODO: score and fetch interval should be transparently overridden float sitemapScore = (float) siteMapURL.getPriority(); sitemapScore *= customScore; int sitemapInterval = getChangeFrequencySeconds( @@ -890,91 +930,119 @@ public void injectURLs(SiteMap sitemap) if (siteMapURL.getLastModified() != null) { lastModified = siteMapURL.getLastModified().getTime(); } - URL u = siteMapURL.getUrl(); - String url = u.toString(); - if (url.length() > maxUrlLength) { - LOG.warn( - "Skipping overlong URL: {} ... (truncated, length = {} characters)", - url.substring(0, maxUrlLength), url.length()); - continue; - } - // for simplicity do host and domain checks before normalization - String host = u.getHost(); - if (injectedHosts.size() >= maxHosts - && !injectedHosts.contains(host)) { - hostLimitRejected++; - context - .getCounter(NutchMetrics.GROUP_SITEMAP_INJECTOR, - NutchMetrics.SITEMAP_URLS_SKIPPED_HOST_LIMIT_REACHED_TOTAL) - .increment(1); - continue; - } - if (checkCrossSubmits) { - String crossSubmit = host; - if (checkCrossSubmitsType == CrossSubmitType.PRIVATE_DOMAIN) { - crossSubmit = EffectiveTldFinder.getAssignedDomain(host, false, - false); - } else if (checkCrossSubmitsType == CrossSubmitType.PUBLIC_DOMAIN) { - crossSubmit = EffectiveTldFinder.getAssignedDomain(host, false, - true); - } - if (crossSubmit == null || !crossSubmits.contains(crossSubmit)) { - crossSubmitsRejected++; - context.getCounter(NutchMetrics.GROUP_SITEMAP_INJECTOR, - NutchMetrics.SITEMAP_URLS_SKIPPED_NOT_ALLOWED_BY_CROSS_SUBMITS_TOTAL) - .increment(1); - continue; - } - } - try { - url = filterNormalize(url); - } catch (Exception e) { - LOG.warn("Skipping {}:", url, e); - url = null; - } - if (url == null) { - context - .getCounter(NutchMetrics.GROUP_SITEMAP_INJECTOR, - NutchMetrics.SITEMAP_URLS_FROM_REJECTED_BY_URL_FILTERS) - .increment(1); - } else { - // URL passed normalizers and filters - totalUrls++; - Text value = new Text(url); - CrawlDatum datum = new CrawlDatum(CrawlDatum.STATUS_INJECTED, - sitemapInterval, sitemapScore); - if (lastModified != -1) { - // datum.setModifiedTime(lastModified); - } - datum.setFetchTime(curTime); - try { - scfilters.injectedScore(value, datum); - } catch (ScoringFilterException e) { - LOG.warn( - "Cannot filter injected score for url {}, using default ({})", - url, e.getMessage()); + injectURL(siteMapURL.getUrl(), sitemapScore, sitemapInterval, lastModified, + crossSubmitsRejected, hostLimitRejected); + + /* + * Inject localized links if there are any. See + * + * and + * + */ + ExtensionMetadata[] linkAttrs = siteMapURL + .getAttributesForExtension(Extension.LINKS); + if (linkAttrs != null) { + for (ExtensionMetadata attr : linkAttrs) { + LinkAttributes linkAttr = (LinkAttributes) attr; + URL href = linkAttr.getHref(); + if (href != null) { + injectURL(href, sitemapScore, sitemapInterval, lastModified, + crossSubmitsRejected, hostLimitRejected); + context.getCounter(NutchMetrics.GROUP_SITEMAP_INJECTOR, + "sitemap_extension_localized_link").increment(1); + } } - - context.getCounter(NutchMetrics.GROUP_SITEMAP_INJECTOR, - NutchMetrics.SITEMAP_URLS_INJECTED).increment(1); - context.write(value, datum); - injectedHosts.add(host); } + } - if (crossSubmitsRejected > 0) { + if (crossSubmitsRejected.get() > 0) { LOG.info("Rejected {} cross-submits for {} ({})", - crossSubmitsRejected, sitemap.getUrl(), + crossSubmitsRejected.get(), sitemap.getUrl(), sitemap.getType().toString()); + context.getCounter(NutchMetrics.GROUP_SITEMAP_INJECTOR, + NutchMetrics.SITEMAP_URLS_SKIPPED_NOT_ALLOWED_BY_CROSS_SUBMITS_TOTAL) + .increment(crossSubmitsRejected.get()); } - if (hostLimitRejected > 0) { + if (hostLimitRejected.get() > 0) { LOG.info( "Rejected {} URLs because max. number of linked hosts is reached for {} ({})", - hostLimitRejected, sitemap.getUrl(), + hostLimitRejected.get(), sitemap.getUrl(), sitemap.getType().toString()); + context + .getCounter(NutchMetrics.GROUP_SITEMAP_INJECTOR, + NutchMetrics.SITEMAP_URLS_SKIPPED_HOST_LIMIT_REACHED_TOTAL) + .increment(hostLimitRejected.get()); } } + public void injectURL(URL u, float sitemapScore, int sitemapInterval, + long lastModified, AtomicLong crossSubmitsRejected, AtomicLong hostLimitRejected) throws IOException, InterruptedException { + String url = u.toString(); + if (url.length() > maxUrlLength) { + LOG.warn( + "Skipping overlong URL: {} ... (truncated, length = {} characters)", + url.substring(0, maxUrlLength), url.length()); + return; + } + + // for simplicity do host and domain checks before normalization + String host = u.getHost(); + if (injectedHosts.size() >= maxHosts && !injectedHosts.contains(host)) { + hostLimitRejected.incrementAndGet(); + return; + } + + if (checkCrossSubmits) { + String crossSubmit = host; + if (checkCrossSubmitsType == CrossSubmitType.PRIVATE_DOMAIN) { + crossSubmit = EffectiveTldFinder.getAssignedDomain(host, false, + false); + } else if (checkCrossSubmitsType == CrossSubmitType.PUBLIC_DOMAIN) { + crossSubmit = EffectiveTldFinder.getAssignedDomain(host, false, + true); + } + if (crossSubmit == null || !crossSubmits.contains(crossSubmit)) { + crossSubmitsRejected.incrementAndGet(); + return; + } + } + try { + url = filterNormalize(url); + } catch (Exception e) { + LOG.warn("Skipping {}:", url, e); + url = null; + } + if (url == null) { + context + .getCounter(NutchMetrics.GROUP_SITEMAP_INJECTOR, + NutchMetrics.SITEMAP_URLS_FROM_REJECTED_BY_URL_FILTERS) + .increment(1); + } else { + // URL passed normalizers and filters + totalUrls++; + Text value = new Text(url); + CrawlDatum datum = new CrawlDatum(CrawlDatum.STATUS_INJECTED, + sitemapInterval, sitemapScore); + if (lastModified != -1) { + // datum.setModifiedTime(lastModified); + } + datum.setFetchTime(curTime); + + try { + scfilters.injectedScore(value, datum); + } catch (ScoringFilterException e) { + LOG.warn( + "Cannot filter injected score for url {}, using default ({})", + url, e.getMessage()); + } + + context.getCounter(NutchMetrics.GROUP_SITEMAP_INJECTOR, + NutchMetrics.SITEMAP_URLS_INJECTED).increment(1); + context.write(value, datum); + injectedHosts.add(host); + } + } } /**