Skip to content

Commit ccc558a

Browse files
Improved WARC export of Fetcher:
- optionally deduplicate records (duplicates caused by redirects) - set WARC capture time to fetch start/end time
1 parent 3ac2744 commit ccc558a

4 files changed

Lines changed: 61 additions & 22 deletions

File tree

src/java/org/apache/nutch/fetcher/Fetcher.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -678,7 +678,6 @@ public FetcherThread(Configuration conf) {
678678
outlinksIgnoreExternal = conf.getBoolean("fetcher.follow.outlinks.ignore.external", false);
679679
maxOutlinkDepthNumLinks = conf.getInt("fetcher.follow.outlinks.num.links", 4);
680680
outlinksDepthDivisor = conf.getInt("fetcher.follow.outlinks.depth.divisor", 2);
681-
storingWarc = conf.getBoolean("fetcher.store.warc", false);
682681
storing404s = conf.getBoolean("fetcher.store.404s", false);
683682

684683
if (conf.getBoolean("fetcher.store.robotstxt", false)) {
@@ -1253,6 +1252,7 @@ public void configure(JobConf job) {
12531252
this.segmentName = job.get(Nutch.SEGMENT_NAME_KEY);
12541253
this.storingContent = isStoringContent(job);
12551254
this.parsing = isParsing(job);
1255+
this.storingWarc = isStoringWarc(job);
12561256

12571257
// if (job.getBoolean("fetcher.verbose", false)) {
12581258
// LOG.setLevel(Level.FINE);
@@ -1269,6 +1269,10 @@ public static boolean isStoringContent(Configuration conf) {
12691269
return conf.getBoolean("fetcher.store.content", true);
12701270
}
12711271

1272+
public static boolean isStoringWarc(Configuration conf) {
1273+
return conf.getBoolean("fetcher.store.warc", true);
1274+
}
1275+
12721276
public void run(RecordReader<Text, CrawlDatum> input,
12731277
OutputCollector<Text, NutchWritable> output,
12741278
Reporter reporter) throws IOException {

src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818
package org.apache.nutch.fetcher;
1919

2020
import java.io.IOException;
21+
import java.text.SimpleDateFormat;
22+
import java.util.Date;
23+
import java.util.Locale;
24+
import java.util.TimeZone;
2125

2226
import org.apache.nutch.crawl.CrawlDatum;
2327
import org.apache.nutch.crawl.NutchWritable;
@@ -44,6 +48,7 @@
4448
import org.commoncrawl.warc.WarcCompleteData;
4549
import org.commoncrawl.warc.WarcOutputFormat;
4650

51+
4752
/** Splits FetcherOutput entries into multiple map files. */
4853
public class FetcherOutputFormat implements OutputFormat<Text, NutchWritable> {
4954

@@ -93,9 +98,24 @@ public RecordWriter<Text, NutchWritable> getRecordWriter(final FileSystem fs,
9398
parseOut = new ParseOutputFormat().getRecordWriter(fs, job, name, progress);
9499
}
95100

96-
if (true) { // TODO: writeWarc
97-
Path warc = new Path(
98-
new Path(FileOutputFormat.getOutputPath(job), "warc"), name);
101+
if (Fetcher.isStoringWarc(job)) {
102+
Path warc = new Path(
103+
new Path(FileOutputFormat.getOutputPath(job), "warc"), name);
104+
// set start and end time of WARC capture
105+
long timelimit = job.getLong("fetcher.timelimit", -1);
106+
long timelimitMins = job.getLong("fetcher.timelimit.mins", -1);
107+
long startTime = System.currentTimeMillis();
108+
long endTime = System.currentTimeMillis();
109+
if (timelimitMins > 0) {
110+
startTime = timelimit - (timelimitMins * 60 * 1000);
111+
if (endTime > timelimit) {
112+
endTime = timelimit;
113+
}
114+
}
115+
SimpleDateFormat fileDate = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US);
116+
fileDate.setTimeZone(TimeZone.getTimeZone("GMT"));
117+
job.set("warc.export.date", fileDate.format(new Date(startTime)));
118+
job.set("warc.export.date.end", fileDate.format(new Date(endTime)));
99119
warcOut = new WarcOutputFormat().getRecordWriter(job, warc);
100120
}
101121
}

src/java/org/commoncrawl/tools/WarcExport.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -109,23 +109,13 @@ public static class ParseDataCombinedInputFormat extends CombineSequenceFileInpu
109109
public static class CrawlDatumCombinedInputFormat extends CombineSequenceFileInputFormat<Text, CrawlDatum> {
110110
}
111111

112-
public String getHostname() {
113-
try {
114-
return InetAddress.getLocalHost().getHostName();
115-
} catch (UnknownHostException e) {
116-
LOG.warn("Failed to get hostname: {}", e.getMessage());
117-
}
118-
return "localhost";
119-
}
120-
121112
public void export(Path outputDir, List<Path> segments,
122113
boolean generateCrawlDiagnostics, boolean generateRobotsTxt, Path cdxPath)
123114
throws IOException {
124115
Configuration conf = getConf();
125116

126117
// We compress ourselves, so this isn't necessary
127118
conf.setBoolean(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.COMPRESS, false);
128-
conf.set("warc.export.hostname", getHostname());
129119

130120
conf.setBoolean("warc.export.crawldiagnostics", generateCrawlDiagnostics);
131121
conf.setBoolean("warc.export.robotstxt", generateRobotsTxt);

src/java/org/commoncrawl/warc/WarcOutputFormat.java

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
import java.io.DataOutputStream;
44
import java.io.IOException;
55
import java.lang.invoke.MethodHandles;
6+
import java.net.InetAddress;
67
import java.net.URI;
78
import java.net.URISyntaxException;
9+
import java.net.UnknownHostException;
810
import java.nio.charset.StandardCharsets;
911
import java.security.MessageDigest;
1012
import java.security.NoSuchAlgorithmException;
@@ -69,20 +71,23 @@ protected static class WarcRecordWriter extends RecordWriter<Text, WarcCompleteD
6971
private boolean generateCrawlDiagnostics;
7072
private boolean generateRobotsTxt;
7173
private boolean generateCdx;
74+
private String lastURL = ""; // for deduplication
75+
private boolean deduplicate;
7276

7377
public WarcRecordWriter(Configuration conf, Path outputPath,
74-
String filename, String hostname,
75-
String publisher, String operator, String software, String isPartOf,
76-
String description,
78+
String filename, String hostname, String publisher, String operator,
79+
String software, String isPartOf, String description,
7780
boolean generateCrawlDiagnostics, boolean generateRobotsTxt,
78-
boolean generateCdx, Path cdxPath, Date captureStartDate)
79-
throws IOException {
81+
boolean generateCdx, Path cdxPath, Date captureStartDate,
82+
boolean deduplicate) throws IOException {
8083

8184
FileSystem fs = outputPath.getFileSystem(conf);
8285

8386
Path warcPath = new Path(new Path(outputPath, "warc"), filename);
8487
warcOut = fs.create(warcPath);
8588

89+
this.deduplicate = deduplicate;
90+
8691
this.generateCdx = generateCdx;
8792
if (generateCdx) {
8893
cdxOut = openCdxOutputStream(new Path(cdxPath, "warc"), filename, conf);
@@ -169,8 +174,9 @@ protected static DataOutputStream openCdxOutputStream(Path cdxPath,
169174
public synchronized void write(Text key, WarcCompleteData value) throws IOException {
170175
URI targetUri;
171176

177+
String url = value.url.toString();
172178
try {
173-
targetUri = new URI(value.url.toString());
179+
targetUri = new URI(url);
174180
} catch (URISyntaxException e) {
175181
LOG.error("Cannot write WARC record, invalid URI: {}", value.url);
176182
return;
@@ -181,6 +187,14 @@ public synchronized void write(Text key, WarcCompleteData value) throws IOExcept
181187
return;
182188
}
183189

190+
if (deduplicate) {
191+
if (lastURL.equals(url)) {
192+
LOG.info("Skipping duplicate record: {}", value.url);
193+
return;
194+
}
195+
lastURL = url;
196+
}
197+
184198
String ip = "0.0.0.0";
185199
Date date = null;
186200
boolean notModified = false;
@@ -456,7 +470,7 @@ public RecordWriter<Text, WarcCompleteData> getRecordWriter(
456470
e.getMessage());
457471
}
458472

459-
String hostname = conf.get("warc.export.hostname", "localhost");
473+
String hostname = conf.get("warc.export.hostname", getHostname());
460474
String publisher = conf.get("warc.export.publisher", null);
461475
String operator = conf.get("warc.export.operator", null);
462476
String software = conf.get("warc.export.software", null);
@@ -465,6 +479,7 @@ public RecordWriter<Text, WarcCompleteData> getRecordWriter(
465479
boolean generateCrawlDiagnostics = conf.getBoolean("warc.export.crawldiagnostics", false);
466480
boolean generateRobotsTxt = conf.getBoolean("warc.export.robotstxt", false);
467481
boolean generateCdx= conf.getBoolean("warc.export.cdx", false);
482+
boolean deduplicate = conf.getBoolean("warc.deduplicate", false);
468483

469484
// WARC recommends - Prefix-Timestamp-Serial-Crawlhost.warc.gz
470485
// https://github.com/iipc/warc-specifications/blob/gh-pages/specifications/warc-format/warc-1.1/index.md#annex-b-informative-warc-file-size-and-name-recommendations
@@ -482,7 +497,7 @@ public RecordWriter<Text, WarcCompleteData> getRecordWriter(
482497

483498
return new WarcRecordWriter(conf, outputPath, filename, hostname, publisher,
484499
operator, software, isPartOf, description, generateCrawlDiagnostics,
485-
generateRobotsTxt, generateCdx, cdxPath, captureStartDate);
500+
generateRobotsTxt, generateCdx, cdxPath, captureStartDate, deduplicate);
486501
}
487502

488503
@Override
@@ -512,4 +527,14 @@ public void checkOutputSpecs(JobContext job) throws FileAlreadyExistsException,
512527
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
513528
new Path[] { outDir }, job.getConfiguration());
514529
}
530+
531+
public String getHostname() {
532+
try {
533+
return InetAddress.getLocalHost().getHostName();
534+
} catch (UnknownHostException e) {
535+
LOG.warn("Failed to get hostname: {}", e.getMessage());
536+
}
537+
return "localhost";
538+
}
539+
515540
}

0 commit comments

Comments
 (0)