33import java .io .DataOutputStream ;
44import java .io .IOException ;
55import java .lang .invoke .MethodHandles ;
6+ import java .net .InetAddress ;
67import java .net .URI ;
78import java .net .URISyntaxException ;
9+ import java .net .UnknownHostException ;
810import java .nio .charset .StandardCharsets ;
911import java .security .MessageDigest ;
1012import java .security .NoSuchAlgorithmException ;
@@ -69,20 +71,23 @@ protected static class WarcRecordWriter extends RecordWriter<Text, WarcCompleteD
6971 private boolean generateCrawlDiagnostics ;
7072 private boolean generateRobotsTxt ;
7173 private boolean generateCdx ;
74+ private String lastURL = "" ; // for deduplication
75+ private boolean deduplicate ;
7276
7377 public WarcRecordWriter (Configuration conf , Path outputPath ,
74- String filename , String hostname ,
75- String publisher , String operator , String software , String isPartOf ,
76- String description ,
78+ String filename , String hostname , String publisher , String operator ,
79+ String software , String isPartOf , String description ,
7780 boolean generateCrawlDiagnostics , boolean generateRobotsTxt ,
78- boolean generateCdx , Path cdxPath , Date captureStartDate )
79- throws IOException {
81+ boolean generateCdx , Path cdxPath , Date captureStartDate ,
82+ boolean deduplicate ) throws IOException {
8083
8184 FileSystem fs = outputPath .getFileSystem (conf );
8285
8386 Path warcPath = new Path (new Path (outputPath , "warc" ), filename );
8487 warcOut = fs .create (warcPath );
8588
89+ this .deduplicate = deduplicate ;
90+
8691 this .generateCdx = generateCdx ;
8792 if (generateCdx ) {
8893 cdxOut = openCdxOutputStream (new Path (cdxPath , "warc" ), filename , conf );
@@ -169,8 +174,9 @@ protected static DataOutputStream openCdxOutputStream(Path cdxPath,
169174 public synchronized void write (Text key , WarcCompleteData value ) throws IOException {
170175 URI targetUri ;
171176
177+ String url = value .url .toString ();
172178 try {
173- targetUri = new URI (value . url . toString () );
179+ targetUri = new URI (url );
174180 } catch (URISyntaxException e ) {
175181 LOG .error ("Cannot write WARC record, invalid URI: {}" , value .url );
176182 return ;
@@ -181,6 +187,14 @@ public synchronized void write(Text key, WarcCompleteData value) throws IOExcept
181187 return ;
182188 }
183189
190+ if (deduplicate ) {
191+ if (lastURL .equals (url )) {
192+ LOG .info ("Skipping duplicate record: {}" , value .url );
193+ return ;
194+ }
195+ lastURL = url ;
196+ }
197+
184198 String ip = "0.0.0.0" ;
185199 Date date = null ;
186200 boolean notModified = false ;
@@ -456,7 +470,7 @@ public RecordWriter<Text, WarcCompleteData> getRecordWriter(
456470 e .getMessage ());
457471 }
458472
459- String hostname = conf .get ("warc.export.hostname" , "localhost" );
473+ String hostname = conf .get ("warc.export.hostname" , getHostname () );
460474 String publisher = conf .get ("warc.export.publisher" , null );
461475 String operator = conf .get ("warc.export.operator" , null );
462476 String software = conf .get ("warc.export.software" , null );
@@ -465,6 +479,7 @@ public RecordWriter<Text, WarcCompleteData> getRecordWriter(
465479 boolean generateCrawlDiagnostics = conf .getBoolean ("warc.export.crawldiagnostics" , false );
466480 boolean generateRobotsTxt = conf .getBoolean ("warc.export.robotstxt" , false );
467481 boolean generateCdx = conf .getBoolean ("warc.export.cdx" , false );
482+ boolean deduplicate = conf .getBoolean ("warc.deduplicate" , false );
468483
469484 // WARC recommends - Prefix-Timestamp-Serial-Crawlhost.warc.gz
470485 // https://github.com/iipc/warc-specifications/blob/gh-pages/specifications/warc-format/warc-1.1/index.md#annex-b-informative-warc-file-size-and-name-recommendations
@@ -482,7 +497,7 @@ public RecordWriter<Text, WarcCompleteData> getRecordWriter(
482497
483498 return new WarcRecordWriter (conf , outputPath , filename , hostname , publisher ,
484499 operator , software , isPartOf , description , generateCrawlDiagnostics ,
485- generateRobotsTxt , generateCdx , cdxPath , captureStartDate );
500+ generateRobotsTxt , generateCdx , cdxPath , captureStartDate , deduplicate );
486501 }
487502
488503 @ Override
@@ -512,4 +527,14 @@ public void checkOutputSpecs(JobContext job) throws FileAlreadyExistsException,
512527 TokenCache .obtainTokensForNamenodes (job .getCredentials (),
513528 new Path [] { outDir }, job .getConfiguration ());
514529 }
530+
531+ public String getHostname () {
532+ try {
533+ return InetAddress .getLocalHost ().getHostName ();
534+ } catch (UnknownHostException e ) {
535+ LOG .warn ("Failed to get hostname: {}" , e .getMessage ());
536+ }
537+ return "localhost" ;
538+ }
539+
515540}
0 commit comments