Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions src/java/org/commoncrawl/util/WarcCdxWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,24 @@

import java.io.IOException;
import java.io.OutputStream;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.Locale;
import java.util.Map;
import java.util.TimeZone;

import org.apache.commons.io.output.CountingOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringUtils;
import org.apache.nutch.metadata.Metadata;
import org.apache.nutch.net.URLNormalizers;
import org.apache.nutch.net.protocols.Response;
import org.apache.nutch.protocol.Content;
import org.archive.url.WaybackURLKeyMaker;
Expand All @@ -57,6 +61,7 @@ public class WarcCdxWriter extends WarcWriter {
private SimpleDateFormat timestampFormat;
private ObjectWriter jsonWriter;
private WaybackURLKeyMaker surtKeyMaker = new WaybackURLKeyMaker(true);
private URLNormalizers urlNormalizersRedirect;

/**
* JSON indentation same as by Python WayBack
Expand All @@ -81,17 +86,18 @@ public void writeObjectEntrySeparator(JsonGenerator jg)
}

public WarcCdxWriter(OutputStream warcOut, OutputStream cdxOut,
Path warcFilePath) {
Path warcFilePath, URLNormalizers redirectNormalizers) {
super(new CountingOutputStream(warcOut));
countingOut = (CountingOutputStream) this.out;
this.cdxOut = cdxOut;
timestampFormat = new SimpleDateFormat("yyyyMMddHHmmss");
timestampFormat = new SimpleDateFormat("yyyyMMddHHmmss", Locale.ROOT);
timestampFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
warcFilename = warcFilePath.toUri().getPath().replaceFirst("^/", "");
ObjectMapper jsonMapper = new ObjectMapper();
jsonMapper.getFactory().configure(JsonGenerator.Feature.ESCAPE_NON_ASCII,
true);
jsonWriter = jsonMapper.writer(new JsonIndenter());
urlNormalizersRedirect = redirectNormalizers;
}

@Override
Expand Down Expand Up @@ -125,6 +131,19 @@ public URI writeWarcResponseRecord(final URI targetUri, final String ip,
String redirectLocation = null;
if (isRedirect(httpStatusCode)) {
redirectLocation = getMeta(content.getMetadata(), "Location");
if (redirectLocation != null) {
try {
// convert redirects from relative to absolute URLs
redirectLocation = new URL(targetUri.toURL(), redirectLocation).toString();
if (urlNormalizersRedirect != null) {
// normalize the redirect target URL
redirectLocation = urlNormalizersRedirect.normalize(redirectLocation,
URLNormalizers.SCOPE_FETCHER);
}
} catch (MalformedURLException e) {
redirectLocation = null;
}
}
}
writeCdxLine(targetUri, date, offset, length, payloadDigest, content, false,
redirectLocation, truncated);
Expand Down
24 changes: 17 additions & 7 deletions src/java/org/commoncrawl/util/WarcRecordWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class WarcRecordWriter extends RecordWriter<Text, WarcCapture> {
int maxContent = Integer.MAX_VALUE;
private String precedingURL = ""; // for deduplication
private URLNormalizers urlNormalizers;
private URLNormalizers urlNormalizersRedirect;

public WarcRecordWriter(Configuration conf, Path outputPath, int partition,
TaskAttemptContext context) throws IOException {
Expand All @@ -124,8 +125,8 @@ public WarcRecordWriter(Configuration conf, Path outputPath, int partition,
FileSystem fs = outputPath.getFileSystem(conf);

SimpleDateFormat fileDate = new SimpleDateFormat("yyyyMMddHHmmss",
Locale.US);
fileDate.setTimeZone(TimeZone.getTimeZone("GMT"));
Locale.ROOT);
fileDate.setTimeZone(TimeZone.getTimeZone("UTC"));

String prefix = conf.get("warc.export.prefix", "NUTCH-CRAWL");

Expand Down Expand Up @@ -167,6 +168,12 @@ public WarcRecordWriter(Configuration conf, Path outputPath, int partition,
skipByContent = true;
}
urlNormalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_INDEXER);
if (generateCdx) {
// URL normalizers to normalize the redirect targets/locations put into
// the CDX index
urlNormalizersRedirect = new URLNormalizers(conf,
URLNormalizers.SCOPE_FETCHER);
}

Path warcPath = new Path(new Path(outputPath, "warc"), filename);
warcOut = fs.create(warcPath);
Expand All @@ -176,7 +183,8 @@ public WarcRecordWriter(Configuration conf, Path outputPath, int partition,
conf.get("warc.export.cdx.path", outputPath.toString()));
cdxOut = openCdxOutputStream(new Path(cdxPath, "warc"), filename, conf);
}
warcWriter = openWarcWriter(warcPath, warcOut, cdxOut);
warcWriter = openWarcWriter(warcPath, warcOut, cdxOut,
urlNormalizersRedirect);
warcinfoId = warcWriter.writeWarcinfoRecord(filename, hostname, publisher,
operator, software, isPartOf, description, captureStartDate);

Expand All @@ -188,7 +196,9 @@ public WarcRecordWriter(Configuration conf, Path outputPath, int partition,
crawlDiagnosticsCdxOut = openCdxOutputStream(
new Path(cdxPath, "crawldiagnostics"), filename, conf);
}
crawlDiagnosticsWarcWriter = openWarcWriter(crawlDiagnosticsWarcPath, crawlDiagnosticsWarcOut,crawlDiagnosticsCdxOut);
crawlDiagnosticsWarcWriter = openWarcWriter(crawlDiagnosticsWarcPath,
crawlDiagnosticsWarcOut, crawlDiagnosticsCdxOut,
urlNormalizersRedirect);
crawlDiagnosticsWarcinfoId = crawlDiagnosticsWarcWriter
.writeWarcinfoRecord(filename, hostname, publisher, operator,
software, isPartOf, description, captureStartDate);
Expand All @@ -203,7 +213,7 @@ public WarcRecordWriter(Configuration conf, Path outputPath, int partition,
filename, conf);
}
robotsTxtWarcWriter = openWarcWriter(robotsTxtWarcPath, robotsTxtWarcOut,
robotsTxtCdxOut);
robotsTxtCdxOut, urlNormalizersRedirect);
robotsTxtWarcinfoId = robotsTxtWarcWriter.writeWarcinfoRecord(filename,
hostname, publisher, operator, software, isPartOf, description,
captureStartDate);
Expand Down Expand Up @@ -497,9 +507,9 @@ protected static String canonicalizeIP(String ip) {
}

private WarcWriter openWarcWriter(Path warcPath, DataOutputStream warcOut,
DataOutputStream cdxOut) {
DataOutputStream cdxOut, URLNormalizers redirectNormalizers) {
if (cdxOut != null) {
return new WarcCdxWriter(warcOut, cdxOut, warcPath);
return new WarcCdxWriter(warcOut, cdxOut, warcPath, redirectNormalizers);
}
return new WarcWriter(warcOut);
}
Expand Down
4 changes: 2 additions & 2 deletions src/java/org/commoncrawl/util/WarcWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ public void end() {

public WarcWriter(final OutputStream out) {
this.origOut = this.out = out;
isoDate = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
isoDate.setTimeZone(TimeZone.getTimeZone("GMT"));
isoDate = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'", Locale.ROOT);
isoDate.setTimeZone(TimeZone.getTimeZone("UTC"));
}

/**
Expand Down