diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json new file mode 100644 index 0000000..9d10725 --- /dev/null +++ b/.devcontainer/devcontainer.json @@ -0,0 +1,27 @@ +// For format details, see https://aka.ms/devcontainer.json. For config options, see the +// README at: https://github.com/devcontainers/templates/tree/main/src/java +{ + "name": "Java", + // Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile + "image": "mcr.microsoft.com/devcontainers/java:1-11-bookworm", + + "features": { + "ghcr.io/devcontainers/features/java:1": { + "version": "none", + "installMaven": "true", + "installGradle": "false" + } + } + + // Use 'forwardPorts' to make a list of ports inside the container available locally. + // "forwardPorts": [], + + // Use 'postCreateCommand' to run commands after the container is created. + // "postCreateCommand": "java -version", + + // Configure tool-specific properties. + // "customizations": {}, + + // Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root. + // "remoteUser": "root" +} diff --git a/src/main/java/org/archive/hadoop/jobs/WEATGenerator.java b/src/main/java/org/archive/hadoop/jobs/WEATGenerator.java index 24c7799..bd0b6f1 100644 --- a/src/main/java/org/archive/hadoop/jobs/WEATGenerator.java +++ b/src/main/java/org/archive/hadoop/jobs/WEATGenerator.java @@ -25,6 +25,7 @@ import org.archive.extract.ExtractorOutput; import org.archive.extract.ProducerUtils; import org.archive.extract.ResourceFactoryMapper; +import org.archive.extract.RealCDXExtractorOutput; import org.archive.extract.WATExtractorOutput; import org.archive.extract.WETExtractorOutput; import org.archive.format.json.JSONUtils; @@ -53,7 +54,7 @@ public class WEATGenerator extends Configured implements Tool { public final static String TOOL_DESCRIPTION = "Generate WAT and WET files from (W)ARC files stored in HDFS"; public static final Log LOG = LogFactory.getLog(WEATGenerator.class); - + public static class WEATGeneratorMapper extends MapReduceBase implements Mapper { private JobConf jobConf; @@ -89,21 +90,30 @@ public void map( Text key, Text value, OutputCollector output, Reporter reporter Path inputPath = new Path(path); Path basePath = inputPath.getParent().getParent(); + Path cdxBasePath = new Path(jobConf.get("cdxBasePath", basePath.toString())); String inputBasename = inputPath.getName(); String watOutputBasename = ""; String wetOutputBasename = ""; + String cdxWatOutputBasename = ""; + String cdxWetOutputBasename = ""; if(path.endsWith(".gz")) { watOutputBasename = inputBasename.substring(0,inputBasename.length()-3) + ".wat.gz"; wetOutputBasename = inputBasename.substring(0,inputBasename.length()-3) + ".wet.gz"; + cdxWatOutputBasename = inputBasename.replace(".warc.gz", ".wat.cdx.gz"); + cdxWetOutputBasename = inputBasename.replace(".warc.gz", ".wet.cdx.gz"); } else { watOutputBasename = inputBasename + ".wat.gz"; wetOutputBasename = inputBasename + ".wet.gz"; + cdxWatOutputBasename = inputBasename.replace(".warc", ".wat.cdx.gz"); + cdxWetOutputBasename = inputBasename.replace(".warc", ".wet.cdx.gz"); } String watOutputFileString = basePath.toString() + "/wat/" + watOutputBasename; String wetOutputFileString = basePath.toString() + "/wet/" + wetOutputBasename; + String cdxWetOutputFileString = cdxBasePath.toString() + "/" + cdxWetOutputBasename; + String cdxWatOutputFileString = cdxBasePath.toString() + "/" + cdxWatOutputBasename; LOG.info("About to write out to " + watOutputFileString + " and " + wetOutputFileString); if (this.jobConf.getBoolean("skipExisting", false)) { @@ -123,6 +133,17 @@ public void map( Text key, Text value, OutputCollector output, Reporter reporter FSDataOutputStream wetfsdOut = FileSystem.get(new java.net.URI(wetOutputFileString), this.jobConf).create(new Path(wetOutputFileString), false); ExtractorOutput wetOut = new WETExtractorOutput(wetfsdOut, wetOutputBasename); + FSDataOutputStream cdxWetfsOut = null; + ExtractorOutput cdxWetOut = null; + FSDataOutputStream cdxWatfsOut = null; + ExtractorOutput cdxWatOut = null; + if ( this.jobConf.getBoolean("outputCDX", false) ) { + cdxWetfsOut = FileSystem.get(new java.net.URI(cdxWetOutputFileString), this.jobConf).create(new Path(cdxWetOutputFileString), false); + cdxWetOut = new RealCDXExtractorOutput(new PrintWriter(cdxWetfsOut)); + cdxWatfsOut = FileSystem.get(new java.net.URI(cdxWatOutputFileString), this.jobConf).create(new Path(cdxWatOutputFileString), false); + cdxWatOut = new RealCDXExtractorOutput(new PrintWriter(cdxWatfsOut)); + } + int count = 0; Resource lr = null; while(count < Integer.MAX_VALUE) { @@ -143,6 +164,9 @@ public void map( Text key, Text value, OutputCollector output, Reporter reporter LOG.info("Outputting new record " + count); } watOut.output(r); + if( cdxWatOut != null ) { + cdxWatOut.output(r); + } if (lr != null && isMetaConcurrentTo(r, lr)) { JSONArray payloadMetadata = JSONUtils.extractArray(r.getMetaData().getTopMetaData(), "Envelope.Payload-Metadata.WARC-Metadata-Metadata.Metadata-Records"); @@ -152,6 +176,9 @@ public void map( Text key, Text value, OutputCollector output, Reporter reporter } if (lr != null) { wetOut.output(lr); + if( cdxWetOut != null ) { + cdxWetOut.output(lr); + } } lr = r; } @@ -160,6 +187,12 @@ public void map( Text key, Text value, OutputCollector output, Reporter reporter } watfsdOut.close(); wetfsdOut.close(); + if ( cdxWatfsOut != null ) { + cdxWatfsOut.close(); + } + if ( cdxWetfsOut != null ) { + cdxWetfsOut.close(); + } } catch ( Exception e ) { LOG.error( "Error processing file: " + path, e ); reporter.incrCounter("exporter", "errors", 1); @@ -259,6 +292,7 @@ public int run( String[] args ) throws Exception { // keep job running despite some failures in generating WATs job.setBoolean("strictMode",false); job.setBoolean("skipExisting", false); + job.setBoolean("outputCDX", false); job.setOutputFormat(NullOutputFormat.class); job.setOutputKeyClass(Text.class); @@ -274,6 +308,13 @@ public int run( String[] args ) throws Exception { } else if(args[arg].equals("-skipExisting")) { job.setBoolean("skipExisting", true); arg++; + } else if(args[arg].equals("-outputCDX")) { + job.setBoolean("outputCDX", true); + arg++; + } else if(args[arg].equals("-cdxBasePath")) { + job.set("cdxBasePath", args[arg+1]); + arg++; + arg++; } else { break; }