diff --git a/.gitignore b/.gitignore index 20b9ebc..ecfe406 100644 --- a/.gitignore +++ b/.gitignore @@ -37,6 +37,7 @@ nosetests.xml # Other mrjob.conf +mrjob*.conf index_env.sh .vagrant diff --git a/README.md b/README.md index 6b11366..420ca3f 100644 --- a/README.md +++ b/README.md @@ -1,18 +1,23 @@ -WebArchive Url Indexing +WebArchive URL Indexing ======================= -This project contains several scripts (MapReduce jobs) for generating url indexes of web archive collections, ususally containing large number of of WARC (or ARC) files. The scripts are designed to ran in Hadoop or Amazon EMR to process terabytes or even petabytes of web archive content. Additionally, thanks to flexibility of the MRJob library, -the scripts can also run on a local machine to build an index cluster. +This project contains several scripts (MapReduce jobs) for generating URL indexes of web archive collections, usually containing large number of WARC (or ARC) files. The scripts are designed to ran on Hadoop or Amazon EMR to process terabytes or even petabytes of web archive content. Additionally, thanks to flexibility of the MRJob library, the scripts can also run on a local machine to build an index cluster. + +The indexer was originally implemented based on [mrjob](//github.com/Yelp/mrjob). Because mrjob is no longer maintained, it was ported to PySpark (based on [cc-pyspark](//github.com/commoncrawl/cc-pyspark/)) in 2024/2025. For running the Spark jobs, see [run_index_ccpyspark.sh](run_index_ccpyspark.sh) and [requirements_ccpyspark.txt](requirements_ccpyspark.txt). See also the documentation of [cc-pyspark](//github.com/commoncrawl/cc-pyspark/). + +The description below documents how to run the MapReduce jobs. ## Initial Setup and Usage -These tools use the MRJob Python library for Hadoop/EMR, and are a pure-python solution to web archive indexing. +Python 3 is required - see the branch `python-2.7` for a previous version running on Python 2.7 (not maintained anymore). + +These tools use the MRJob Python library for Hadoop/EMR, and are a pure-Python solution to web archive indexing. To install [dependencies](#dependencies): `pip install -r requirements.txt` -#### Remote - EMR/Hadoop +### Remote - EMR/Hadoop -*Note: At this time, the scripts have been tested with the CommonCrawl data set on EMR (AMI 3.9.0 + Hadoop 2.4.0) and on CDH 5.8.0.* +*Note: At this time, the scripts have been tested with the Common Crawl data set on Apache Bigtop 1.5.0 and with Python 2.7 on CDH 6.3.2 and on EMR (AMI 3.9.0 + Hadoop 2.4.0).* To run with MRJob library on EMR, a system-specific `mrjob.conf` needs to be configured. The file contains all the settings necessary to specify your EMR cluster or to configure non-default settings on other Hadoop clusters. Refer to the [MRJob documentation for details](https://pythonhosted.org/mrjob/guides/configs-basics.html). The shell scripts to launch the tools are supposed to be run on EMR, for other Hadoop clusters replace `-r emr` by `-r hadoop`. @@ -20,13 +25,20 @@ In addition, a bash script `index_env.sh` is used to specify all the relevant pa You can simply run `cp index_env.sample.sh index_env.sh` to copy the provided sample. Please refer to the file for more details and to fill in the actual paths. Note that on EMR paths to AWS S3 have to be given as `s3://bucket/path` while on Hadoop (no EMR) paths must start with `s3a://`. -Requirements have to be installed on all nodes of the cluster. The script `bootstrap.sh` installs everything needed on EMR, including Python and packages necessary to compile the requirements. +Requirements have to be installed on all nodes of the cluster. The script [bootstrap.sh](./bootstrap.sh) installs everything needed on EMR, including Python and packages necessary to compile the requirements. + +The script [run_index_hadoop.sh](./run_index_hadoop.sh) runs all steps necessary to create the CDX index of a monthly Common Crawl. -#### Local +### Local No additional setup is necessary. See [building a local cluster](#building-a-local-cluster). -### Tools Provided +### S3 Read and Write Permissions + +Permissions to read and write from the involved locations on S3 need to be granted – in "remote" mode to all nodes of the cluster. This is best done by attaching IAM roles to the EC2 cluster instances, see [boto3 configuring credentials](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html#configuring-credentials). In local mode, a credentials file or environment variables could be also an option. + + +## Tools Provided This repository provides three Hadoop MapReduce jobs to create [a shared url index](#zipnum-sharded-cdx-cluster) from an input list of WARC/ARC files. This process can be split into three jobs. @@ -34,16 +46,14 @@ This repository provides three Hadoop MapReduce jobs to create [a shared url ind 2. [Sampling CDXs to Create Split File](#sampling-cdxs-to-create-split-file) 3. [Generating a ZipNum CDX Cluster](#generating-a-zipnum-cdx-cluster) -Each step is a MapReduce job, run with the Python MRJob library. The first step may be omitted if you already have -indexes for the WARCs. +Each step is a MapReduce job, run with the Python MRJob library. The first step may be omitted if you already have indexes for the WARCs. If you have a small number of local cdx files, you also use these scripts to [build a local cluster](#building-a-local-cluster) [Additional background info on indexing and the formats used](#additional-info). -## Indexing Individual ARC/WARCs to CDX Files ## - +### Indexing Individual ARC/WARCs to CDX Files ### *Note: If you already have .cdx files for each of your WARC/ARCS, you may skip this step* @@ -53,13 +63,13 @@ The job can be started by running: runindexwarcs.sh ``` -This boostraps the `indexwarcsjobs.py` script, which will start a map-reduce job to create a cdx file for each WARC/ARC file in the input. +This bootstraps the [indexwarcsjobs.py](./indexwarcsjobs.py) script, which will start a MapReduce job to create a cdx file for each WARC/ARC file in the input. **Input:** A manifest file of WARC/ARCs to be indexed -**Output:** A compressed cdx file (.cdx.gz) for each WARC/ARC processed. +**Output:** A compressed cdx file (`.cdx.gz`) for each WARC/ARC processed. -The path of each input is kept and the extension is replaced with .cdx.gz. +The path of each input is kept and the extension is replaced with `.cdx.gz`. Thus, for inputs: @@ -76,7 +86,7 @@ and output directory of `/cdx/`, the following will be created: This is a map only job, and a single mapper is created per input file by default. -The `pywb.warc.cdxindexer.write_cdx_index`, the same used by the pywb `cdx-indexer` app is used to create the index. +The `pywb.indexer.cdxindexer.write_cdx_index`, the same used by the pywb `cdx-indexer` app is used to create the index. Refer to `cdx-indexer -h` for list of possible options. ### Sampling CDXs to Create Split File ### @@ -87,15 +97,15 @@ This job can be started by running: runsample.sh ``` -The actual job, defined in `samplecdxjob.py` determines *split points* for the cluster for an arbitrary number of splits. The final job will sort all the lines from all the CDX files into N parts (determined by number of reducers), however, in order to do so, it is necessary to determine a rough distribution of the url space. +The actual job, defined in [samplecdxjob.py](./samplecdxjob.py) determines *split points* for the cluster for an arbitrary number of splits. The final job will sort all the lines from all the CDX files into N parts (determined by number of reducers), however, in order to do so, it is necessary to determine a rough distribution of the url space. **Input:** A path to per-WARC CDX files (created in step 1) -**Output:** A file containing split points to split CDX space into N shards (in hadoop SequenceFile format) to +**Output:** A file containing split points to split CDX space into N shards (in Hadoop SequenceFile format) to *Note: This step is generally only necessary the first time a cluster is created. If a subsequent cluster with similar distribution is created, it is possible to reuse an existing split file. Additionally, it will be possible to create a more accurate split file directly from an existing cluster (TODO)* -To create the split file, all the CDX files are sampled using [a reservoir sampling technique](http://had00b.blogspot.com/2013/07/random-subset-in-mapreduce.html) (This technique may need some refinement but only an *approximate* distribution is needed). +To create the split file, all the CDX files are sampled using [a reservoir sampling technique](https://had00b.blogspot.com/2013/07/random-subset-in-mapreduce.html) (This technique may need some refinement but only an *approximate* distribution is needed). The output of this job will be a single file with N-1 split points (for N parts/shards/reducers). @@ -104,7 +114,7 @@ The job creates a plain text file with N-1 lines. #### Converting to SequenceFile However, to be used with the final job, the file needs to be in a Hadoop `SequenceFile` format. -Fortunatelly, the `python-hadoop` library provides an easy way to convert a text file to a Hadoop SequenceFile of this format. The `dosample.py` script combines the map-reduce job with the SequenceFile conversion and then uploads the file sequencefile to final destination (currently S3 path). +Fortunatelly, the [python-hadoop](//github.com/commoncrawl/python-hadoop) library provides an easy way to convert a text file to a Hadoop SequenceFile of this format. The [dosample.py](./dosample.py) script combines the MapReduce job with the SequenceFile conversion and then uploads the file SequenceFile to final destination (currently S3 path). ### Generating a ZipNum CDX Cluster @@ -114,7 +124,7 @@ The final job can be started by running: runzipcluster.sh ``` -The corresponding script, `zipnumclusterjob.py`, creates the [ZipNum Sharded CDX Cluster](#zipnum-sharded-cdx-cluster) from the individual CDX files (created in the first job) using the split file (created in the second job). +The corresponding script, [zipnumclusterjob.py](./zipnumclusterjob.py), creates the [ZipNum Sharded CDX Cluster](#zipnum-sharded-cdx-cluster) from the individual CDX files (created in the first job) using the split file (created in the second job). **Input:** Per-WARC CDX files and split points file (from previous two steps) @@ -136,16 +146,15 @@ This index can then be used with existing tools, such as pywb and OpenWayback, w Thanks to the flexibility of the MRJob library, it is also possible to build a local ZipNum cluster, no Hadoop or EMR required! (MRJob automatically computes even split points when running locally, so the split file computation step is not necessary). -If you have a number of [CDX](#cdx-file-format) files on disk, you can use the `build_local_zipnum.py` script to directly build a cluster locally on your machine. +If you have a number of [CDX](#cdx-file-format) files on disk, you can use the [build_local_zipnum.py](./build_local_zipnum.py) script to directly build a cluster locally on your machine. -For example, the following will be a cluster of 25 shards. +For example, the following command creates a cluster of 25 shards. ``` python build_local_zipnum.py /path/to/zipnum/ -s 25 -p /path/to/cdx/*.cdx.gz ``` -(The `-p` flag will specify if parallel processes wil be created -for each map/reduce task, or (if absent) all tasks will be created sequentially). +(The `-p` flag specifies to create parallel processes for each map/reduce task, or (if absent) all tasks will be created sequentially). After the script runs, the following files will be created: ``` @@ -155,18 +164,17 @@ After the script runs, the following files will be created: /path/to/zipnum/cluster.loc ``` -The `cluster.summary` and `cluster.loc` files may be used with index ZipNum cluster support in the wayback machine, including -pywb and OpenWayback. +The `cluster.summary` and `cluster.loc` files may be used with index ZipNum cluster support in the wayback machine, including pywb and OpenWayback. ### Dependencies These tools depend on the following libraries/tools. If using Hadoop, they need to be installed on the cluster. -If Using EMR, the MRJob library can do this automatically when starting a new cluster, and a bootstrap script is also provided for easy installation seperate in a persistant EMR job flow. +If Using EMR, the MRJob library can do this automatically when starting a new cluster, and a bootstrap script is also provided for easy installation separate in a persistent EMR job flow. - [pywb web replay tools](https://github.com/ikreymer/pywb) for creating CDX indexes from WARCs and ARCs - [MRJob](https://pythonhosted.org/mrjob/) MapReduce library for running MapReduce jobs on Hadoop, Amazon EMR or locally. -- [python-hadoop](https://github.com/matteobertozzi/Hadoop/tree/master/python-hadoop) - A python hadoop utility library for creating a hadoop SequenceFile in pure Python. (for generating split point SequenceFile) +- [python-hadoop](https://github.com/commoncrawl/python-hadoop) - A Python Hadoop utility library for creating a Hadoop SequenceFile in pure Python (used to define splits required for total-order sorting). This project is forked from Matteo Bertozzi's [Hadoop](https://github.com/matteobertozzi/Hadoop/tree/master/python-hadoop) and ported to Python 3. ## Additional Info @@ -190,24 +198,18 @@ The distributed indexing job uses this tool to build an index for each file in p ### CDX File Format -An index for a web archive (WARC or ARC) file is often referred to as a CDX file, probably from **C**apture/**C**rawl -in**D**e**X** **(CDX)**. A CDX file is typically a sorted plain-text file (optionally gzip-compressed) format, with each line -representing info about a single capture in an archive. The CDX contains multiple fields, typically the url and where to -find the archived contents of that url. Unfortunately, no standardized format for CDX files exists, and there have been -many formats, usually with varying number of space-seperated fields. Here is an old reference for [CDX File](https://archive.org/web/researcher/cdx_file_format.php) (from Internet Archive). In practice, CDX files typically contain a subset of the possible fields. +An index for a web archive (WARC or ARC) file is often referred to as a CDX file, probably from **C**apture/**C**rawl in**D**e**X** **(CDX)**. A CDX file is typically a sorted plain-text file (optionally gzip-compressed) format, with each line representing info about a single capture in an archive. The CDX contains multiple fields, typically the url and where to find the archived contents of that url. Unfortunately, no standardized format for CDX files exists, and there have been many formats, usually with varying number of space-separated fields. Here is an old reference for [CDX File](https://archive.org/web/researcher/cdx_file_format.php) (from Internet Archive). In practice, CDX files typically contain a subset of the possible fields. -While there are no required fields, in practice, the following 6 fields -are needed to identify a record: `url search key`, `url timestamp`, `original url`, `archive file`, `archive offset`, `archive length`. The search key is often the url transformed and 'canonicalized' in a way to make it easier for lexigraphic seaching. +While there are no required fields, in practice, the following 6 fields are needed to identify a record: `url search key`, `url timestamp`, `original url`, `archive file`, `archive offset`, `archive length`. The search key is often the url transformed and 'canonicalized' in a way to make it easier for lexicographic searching. A common transformation is to reverse subdomains `example.com` -> `com,example,)/` to allow for searching by domain, then subdomains. -The indexing job uses the flexible pywb `cdx-indexer` to create indexs of a certain format. However, the other jobs are compatible with any existing CDX format as well. Other indexing tools can be used also but require seperate integration. +The indexing job uses the flexible pywb `cdx-indexer` to create indexes of a certain format. However, the other jobs are compatible with any existing CDX format as well. Other indexing tools can be used also but require separate integration. ### ZipNum Sharded CDX Cluster A CDX file is generally accessed by doing a simple binary search through the file. This scales well to very large (multi-gigabyte) CDX files. However, for very large archives (many terabytes or petabytes), binary search across a single file has its limits. A more scalable alternative to a single CDX file is gzip compressed chunked cluster, with a binary searchable index. -In this format, sometimes called the ZipNum or Ziplines cluster (for some X number of cdx lines zipped together), all actual -CDX lines are gzipped compressed an concatenated together. To allow for random access, the lines are gzipped in groups of X lines (often 3000, but can be anything). This allows for the full index to be spread over N number of gzipped files, but has the overhead of requiring N lines to be read for each lookup. Generally, this overhead is negligible when looking up large indexes, and non-existent when doing a range query across many CDX lines. +In this format, sometimes called the ZipNum or Ziplines cluster (for some X number of cdx lines zipped together), all actual CDX lines are gzipped compressed an concatenated together. To allow for random access, the lines are gzipped in groups of X lines (often 3000, but can be anything). This allows for the full index to be spread over N number of gzipped files, but has the overhead of requiring N lines to be read for each lookup. Generally, this overhead is negligible when looking up large indexes, and non-existent when doing a range query across many CDX lines. The goal of the last job is to create such a index, split into a number of arbitrary shards. For each shard, there is an index file and a secondary index file. At the end, the secondary index is concatenated to form the final, binary searchable index. The number of shards is variable and is equal to the number of reducers used. diff --git a/bootstrap.sh b/bootstrap.sh index 91929a9..2e379c6 100644 --- a/bootstrap.sh +++ b/bootstrap.sh @@ -1,10 +1,5 @@ #!/bin/bash # bootstrap commands -sudo yum install -y python27 python27-devel gcc-c++ git libffi-devel -sudo curl -o /tmp/get-pip.py https://bootstrap.pypa.io/get-pip.py -sudo python2.7 /tmp/get-pip.py -# http://superuser.com/questions/762185/python2-7-pip2-7-install-in-centos6-root-does-not-see-usr-local-bin -sudo /usr/local/bin/pip2.7 install boto mrjob simplejson -sudo /usr/local/bin/pip2.7 install pywb -#sudo pip2.7 install -e "git+git://github.com/ikreymer/pywb.git@develop#egg=pywb-0.9.7-dev" +sudo yum install -y python3 python3-devel python3-pip gcc-c++ git libffi-devel +sudo pip3 install boto3 mrjob simplejson pywb diff --git a/build_local_zipnum.py b/build_local_zipnum.py index d39f44d..7518f45 100644 --- a/build_local_zipnum.py +++ b/build_local_zipnum.py @@ -1,11 +1,13 @@ -import os import glob -from argparse import ArgumentParser -from zipnumclusterjob import ZipNumClusterJob -from mrjob.launch import MRJobLauncher import logging +import os import sys +from argparse import ArgumentParser + +from mrjob.job import MRJob + +from zipnumclusterjob import ZipNumClusterJob log = logging.getLogger(__name__) @@ -55,8 +57,8 @@ def build_summary_and_loc(output_dir): with open(filein, 'r+b') as partfh: for line in partfh: line = line.rstrip() - line += '\t' + str(count) - fh.write(line + '\n') + line += b'\t' + str(count).encode('utf-8') + fh.write(line + b'\n') count += 1 # Write loc file @@ -68,7 +70,7 @@ def build_summary_and_loc(output_dir): print('Building Loc File: ' + loc_file) with open(loc_file, 'w+b') as fh: for filename in inputs: - fh.write(os.path.basename(filename) + '\t' + filename + '\n') + fh.write((os.path.basename(filename) + '\t' + filename + '\n').encode('utf-8')) def main(): @@ -82,13 +84,13 @@ def main(): help='Number of lines per gzip block (default 3000)') parser.add_argument('-p', '--parallel', action='store_true', - help='Run in parllel (multiple maps/reducer processes)') + help='Run in parallel (multiple maps/reducer processes)') r = parser.parse_args() - MRJobLauncher.set_up_logging(quiet=False, - verbose=False, - stream=sys.stderr) + MRJob.set_up_logging(quiet=False, + verbose=False, + stream=sys.stderr) log.setLevel(logging.INFO) compat_log = logging.getLogger('mrjob.compat') diff --git a/dosample.py b/dosample.py index 55db3cd..5396c52 100644 --- a/dosample.py +++ b/dosample.py @@ -1,16 +1,28 @@ -from samplecdxjob import SampleCDXJob -from seqfileutils import make_text_null_seq - +import logging +import os import sys import tempfile -import os + +from mrjob.util import log_to_stream + +from samplecdxjob import SampleCDXJob +from seqfileutils import make_text_null_seq SEQ_FILE = 'splits.seq' SPL_FILE = 'splits.txt' +LOG = logging.getLogger('SampleCDXJob') +LOG_FORMAT = "%(asctime)s %(levelname)s %(name)s: %(message)s" + + def run_sample_job(): job = SampleCDXJob(args=sys.argv[1:]) + verbose = '--verbose' in sys.argv[1:] + log_to_stream(format=LOG_FORMAT, name='SampleCDXJob', debug=verbose) + log_to_stream(format=LOG_FORMAT, name='mrjob', debug=verbose) + log_to_stream(format=LOG_FORMAT, name='__main__', debug=verbose) + with job.make_runner() as runner: runner.run() @@ -21,7 +33,7 @@ def run_sample_job(): # dump streaming output to file with open(SPL_FILE, 'wb') as fh: - for x in runner.stream_output(): + for x in runner.cat_output(): fh.write(x) fh.close() diff --git a/index_env.sample.sh b/index_env.sample.sh index 9c37687..a25a341 100644 --- a/index_env.sample.sh +++ b/index_env.sample.sh @@ -1,6 +1,4 @@ #!/bin/bash -export AWS_ACCESS_KEY_ID="" -export AWS_SECRET_ACCESS_KEY="" export WARC_MANIFEST="" export WARC_CDX_BUCKET="" diff --git a/indexwarcs_cc_pyspark.py b/indexwarcs_cc_pyspark.py new file mode 100644 index 0000000..2eec9ef --- /dev/null +++ b/indexwarcs_cc_pyspark.py @@ -0,0 +1,86 @@ +from gzip import GzipFile +from tempfile import TemporaryFile + +from pywb.indexer.cdxindexer import write_cdx_index + +from sparkcc import CCFileProcessorSparkJob + + + +class IndexWARCJob(CCFileProcessorSparkJob): + """ This job receives as input a manifest of WARC/ARC files and produces + a CDX index per file + + The pywb.indexer.cdxindexer is used to create the index, with a fixed set of options + """ + + name = 'IndexWARCJob' + + # description of input and output shown by --help + input_descr = "Path to file listing input paths (WARC/WAT/WET/ARC)" + output_descr = """Table containing the output CDX files +(in spark.sql.warehouse.dir) and the indexing status: + 1 successfully created, + 0 already exists, + -1 processing failed""" + + # PyWB index options + index_options = { + 'surt_ordered': True, + 'sort': True, + 'cdxj': True, + #'minimal': True + } + + def add_arguments(self, parser): + super(CCFileProcessorSparkJob, self).add_arguments(parser) + parser.add_argument("--output_base_url", required=True, + help="Destination for CDX output.") + parser.add_argument("--skip_existing", action='store_true', + help="Skip processing files for which " + "the output CDX file already exists.") + + def _conv_warc_to_cdx_path(self, warc_path): + cdx_path = warc_path.replace('crawl-data', 'cc-index/cdx') + cdx_path = cdx_path.replace('.warc.gz', '.cdx.gz') + cdx_path = cdx_path.replace('.warc.wet.gz', '.wet.cdx.gz') + cdx_path = cdx_path.replace('.warc.wat.gz', '.wat.cdx.gz') + return cdx_path + + def process_file(self, warc_path, tempfd): + + cdx_path = self._conv_warc_to_cdx_path(warc_path) + + self.get_logger().info('Indexing WARC: %s', warc_path) + + if self.args.skip_existing and \ + self.check_for_output_file(cdx_path,self.args.output_base_url): + self.get_logger().info('Already Exists: %s', cdx_path) + yield cdx_path, 0 + return + + with TemporaryFile(mode='w+b', + dir=self.args.local_temp_dir) as cdxtemp: + + success = False + with GzipFile(fileobj=cdxtemp, mode='w+b') as cdxfile: + try: + write_cdx_index(cdxfile, tempfd, warc_path, **self.index_options) + success = True + except Exception as exc: + self.get_logger().error('Failed to index %s: %s', warc_path, exc) + + cdxtemp.flush() + cdxtemp.seek(0) + + if success: + self.write_output_file(cdx_path, cdxtemp, self.args.output_base_url) + self.get_logger().info('Successfully uploaded CDX: %s', cdx_path) + yield cdx_path, 1 + else: + yield cdx_path, -1 + + +if __name__ == "__main__": + job = IndexWARCJob() + job.run() diff --git a/indexwarcsjob.py b/indexwarcsjob.py index bf48fa2..5a6de8b 100644 --- a/indexwarcsjob.py +++ b/indexwarcsjob.py @@ -1,20 +1,31 @@ -import boto +import logging import sys +from datetime import datetime + +import boto3 +import botocore + from mrjob.job import MRJob from mrjob.protocol import RawValueProtocol +from mrjob.util import log_to_stream from tempfile import TemporaryFile -from pywb.warc.cdxindexer import write_cdx_index +from pywb.indexer.cdxindexer import write_cdx_index from gzip import GzipFile +LOG = logging.getLogger('IndexWARCJob') +log_to_stream(format="%(asctime)s %(levelname)s %(name)s: %(message)s", + name='IndexWARCJob') + + #============================================================================= class IndexWARCJob(MRJob): """ This job receives as input a manifest of WARC/ARC files and produces a CDX index per file - The pywb.warc.cdxindexer is used to create the index, with a fixed set of options + The pywb.indexer.cdxindexer is used to create the index, with a fixed set of options TODO: add way to customized indexing options. """ @@ -32,32 +43,49 @@ class IndexWARCJob(MRJob): 'mapreduce.input.lineinputformat.linespermap': 2, } - def configure_options(self): + def configure_args(self): """Custom command line options for indexing""" - super(IndexWARCJob, self).configure_options() + super(IndexWARCJob, self).configure_args() + + self.add_passthru_arg('--warc_bucket', dest='warc_bucket', + default='commoncrawl', + help='source bucket for warc paths, if input is a relative path (S3 Only)') - self.add_passthrough_option('--warc_bucket', dest='warc_bucket', - default='commoncrawl', - help='source bucket for warc paths, if input is a relative path (S3 Only)') + self.add_passthru_arg('--cdx_bucket', dest='cdx_bucket', + default='my_cdx_bucket', + help='destination bucket for cdx (S3 Only)') - self.add_passthrough_option('--cdx_bucket', dest='cdx_bucket', - default='my_cdx_bucket', - help='destination bucket for cdx (S3 Only)') + self.add_passthru_arg('--skip-existing', dest='skip_existing', action='store_true', + help='skip processing files that already have CDX', + default=True) - self.add_passthrough_option('--skip-existing', dest='skip_existing', action='store_true', - help='skip processing files that already have CDX', - default=True) + self.add_passthru_arg("--s3_local_temp_dir", dest='s3_local_temp_dir', + help='Local temporary directory to buffer content from S3', + default=None) def mapper_init(self): - # Note: this assumes that credentials are set via - # AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY env variables - self.conn = boto.connect_s3() + # Note: this assumes that credentials are properly configured, see + # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html#configuring-credentials + # best via IAM roles: + # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html#best-practices-for-configuring-credentials + self.boto_config = botocore.client.Config( + read_timeout=180, + retries={'max_attempts' : 20}) + s3client = boto3.client('s3', config=self.boto_config) - self.warc_bucket = self.conn.lookup(self.options.warc_bucket) - assert(self.warc_bucket) + try: + s3client.head_bucket(Bucket=self.options.warc_bucket) + except botocore.exceptions.ClientError as e: + LOG.error('Failed to access bucket %s: %s', + self.options.warc_bucket, e) + return - self.cdx_bucket = self.conn.lookup(self.options.cdx_bucket) - assert(self.cdx_bucket) + try: + s3client.head_bucket(Bucket=self.options.cdx_bucket) + except botocore.exceptions.ClientError as e: + LOG.error('Failed to access bucket %s: %s', + self.options.cdx_bucket, e) + return self.index_options = { 'surt_ordered': True, @@ -71,42 +99,68 @@ def mapper(self, _, line): try: self._load_and_index(warc_path) except Exception as exc: - sys.stderr.write(warc_path + '\n') + LOG.error('Failed to index %s', warc_path) raise def _conv_warc_to_cdx_path(self, warc_path): # set cdx path cdx_path = warc_path.replace('crawl-data', 'cc-index/cdx') cdx_path = cdx_path.replace('.warc.gz', '.cdx.gz') + cdx_path = cdx_path.replace('.warc.wet.gz', '.wet.cdx.gz') + cdx_path = cdx_path.replace('.warc.wat.gz', '.wat.cdx.gz') return cdx_path def _load_and_index(self, warc_path): - warckey = self.warc_bucket.get_key(warc_path) cdx_path = self._conv_warc_to_cdx_path(warc_path) - if self.options.skip_existing: - cdxkey = self.cdx_bucket.get_key(cdx_path) + LOG.info('Indexing WARC: %s', warc_path) + s3client = boto3.client('s3', config=self.boto_config) - if cdxkey: - sys.stderr.write('Already Exists: {}\n'.format(cdx_path)) + if self.options.skip_existing: + try: + s3client.head_object(Bucket=self.options.cdx_bucket, + Key=cdx_path) + LOG.info('Already Exists: %s', cdx_path) return + except botocore.client.ClientError as exception: + pass # ok, not found - with TemporaryFile(mode='w+b') as warctemp: - warckey.get_file(warctemp, override_num_retries=10) + try: + s3client.head_object(Bucket=self.options.warc_bucket, + Key=warc_path) + except botocore.client.ClientError as exception: + LOG.error('WARC not found: %s', warc_path) + return + + with TemporaryFile(mode='w+b', + dir=self.options.s3_local_temp_dir) as warctemp: + LOG.info('Fetching WARC: %s', warc_path) + try: + s3client.download_fileobj(self.options.warc_bucket, warc_path, warctemp) + except botocore.client.ClientError as exception: + LOG.error('Failed to download %s: %s', warc_path, exception) + return warctemp.seek(0) + LOG.info('Successfully fetched WARC: %s', warc_path) - with TemporaryFile(mode='w+b') as cdxtemp: + with TemporaryFile(mode='w+b', + dir=self.options.s3_local_temp_dir) as cdxtemp: with GzipFile(fileobj=cdxtemp, mode='w+b') as cdxfile: # Index to temp write_cdx_index(cdxfile, warctemp, warc_path, **self.index_options) # Upload temp - cdxkey = self.cdx_bucket.new_key(cdx_path) cdxtemp.flush() - - cdxkey.set_contents_from_file(cdxtemp, rewind=True) + cdxtemp.seek(0) + LOG.info('Uploading CDX: %s', cdx_path) + try: + s3client.upload_fileobj(cdxtemp, self.options.cdx_bucket, cdx_path) + except botocore.client.ClientError as exception: + LOG.error('Failed to upload %s: %s', cdx_path, exception) + return + LOG.info('Successfully uploaded CDX: %s', cdx_path) if __name__ == "__main__": diff --git a/publish_index.sh b/publish_index.sh new file mode 100755 index 0000000..bd4b772 --- /dev/null +++ b/publish_index.sh @@ -0,0 +1,47 @@ +#!/bin/bash + +YEARWEEK="$1" +MONTH="$2" + +if [ -z "$YEARWEEK" ]; then + echo "$0 []" + exit 1 +fi + +set -x +set -e + + +## Create the metadata (title) for the index on the website +if ! [ -e "$YEARWEEK-metadata.yaml" ]; then + if [ -n "$MONTH" ]; then + YEAR=${YEARWEEK%%-*} + echo "title: '${MONTH^} $YEAR Index'" >"$YEARWEEK-metadata.yaml" + else + aws s3 cp s3://commoncrawl/cc-index/collections/CC-MAIN-2015-18/metadata.yaml "$YEARWEEK-metadata.yaml" + echo "Please, edit $YEARWEEK-metadata.yaml" + exit 1 + fi +fi +aws s3 cp "$YEARWEEK-metadata.yaml" "s3://commoncrawl/cc-index/collections/CC-MAIN-$YEARWEEK/metadata.yaml" + + +echo "Prepare and install cluster.idx if not yet done" +# Note: This is required for mrjob-based implementation, but not for that based on cc-pyspark. +# The jobs zipnumcluster_cc_pyspark.py already does the concatenation of the 300 per-partition +# *.idx files into the cluster.idx +if aws s3 ls s3://commoncrawl/cc-index/collections/CC-MAIN-$YEARWEEK/indexes/cluster.idx; then + echo "cluster.idx already exists on s3://commoncrawl/cc-index/collections/CC-MAIN-$YEARWEEK/indexes/" + exit 0 +fi + +test -d "cdx-$YEARWEEK" || mkdir "cdx-$YEARWEEK" +cd "cdx-$YEARWEEK" + +## create cluster index + +aws s3 cp --recursive --exclude '*' --include 'part-*' "s3://commoncrawl-index-temp/CC-MAIN-$YEARWEEK/indexes/" ./ +cat part-* | awk '{printf "%s\t%s\n",$0,NR}' >cluster.idx +LC_ALL=C sort -c ./cluster.idx + +aws s3 cp ./cluster.idx "s3://commoncrawl/cc-index/collections/CC-MAIN-$YEARWEEK/indexes/cluster.idx" diff --git a/requirements.txt b/requirements.txt index 84f7ad7..84605b9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,4 @@ mrjob -boto +boto3 pywb -#-e git+https://github.com/matteobertozzi/Hadoop.git#egg=hadoop&subdirectory=python-hadoop --e git+https://github.com/commoncrawl/python-hadoop.git#egg=master +-e git+https://github.com/commoncrawl/python-hadoop.git@main#egg=hadoop diff --git a/requirements_ccpyspark.txt b/requirements_ccpyspark.txt new file mode 100644 index 0000000..9d3d21f --- /dev/null +++ b/requirements_ccpyspark.txt @@ -0,0 +1,3 @@ +boto3 +# PyWB is required to index WARC files, running indexwarcs_cc_pyspark.py +pywb diff --git a/run_index_ccpyspark.sh b/run_index_ccpyspark.sh new file mode 100755 index 0000000..2484eec --- /dev/null +++ b/run_index_ccpyspark.sh @@ -0,0 +1,188 @@ +#!/bin/bash + +if [ $# -lt 2 ]; then + cat <<"EOF" +$0 [] + +Create a Common Crawl CDX ZipNum index for a monthly crawl. All steps are run on Spark. + + Year and week of the monthly crawl to be indexed, e.g. 2016-44 + used to determine the final location of the index + s3://commoncrawl/cc-index/collections/CC-MAIN-2016-44/... + Also locations for temporary files include the crawl name. + + List of WARC file objects to be indexed, e.g, the WARC/WAT/WET list + s3://commoncrawl/crawl-data/CC-MAIN-2016-44/warc.paths + or any subset or union of multiple WARC listings (incl. robots.txt WARCs). + Paths in the list must be keys/objects in the Common Crawl bucket + or another bucket configured in this script (WARC_PREFIX). + The path to the list must be an absolute URL on HDFS or S3A. + + The "index warcs" step is skipped if an empty string is passed as argument. + Since 2018 the per-WARC CDX files are written directly by the Fetcher + and include index fields combined from the WARC response and metadata record. + The latter holds the detected language and charset. + + Optional split file to be reused from previous crawl with similar distribution of URLs. + If not given, splits are calculated and saved on the default split file path. + +EOF + exit 1 +fi + + +YEARWEEK="$1" +WARC_MANIFEST="$2" +REUSE_SPLIT_FILE="$3" + +CRAWL="CC-MAIN-$YEARWEEK" + +echo "Generating cc-index for $CRAWL" +echo +echo WARC_MANIFEST="$WARC_MANIFEST" +echo + +# Path prefix of WARC/WAT/WET files listed in WARC_MANIFEST +WARC_PREFIX="s3://commoncrawl/" + +# AWS S3 bucket to hold CDX files +WARC_CDX_BUCKET="commoncrawl-index-temp" +WARC_CDX_PREFIX="s3://$WARC_CDX_BUCKET/" + +# Location of the CDX status table +SPARK_SQL_WAREHOUSE="s3a://$WARC_CDX_BUCKET/$CRAWL" +CDX_STATUS_TABLE="cdx_status" + + +# glob pattern to match all CDX files generated in step 1 (indexwarcs_cc_pyspark.py) +# or available otherwise. The URI scheme must be supported by Hadoop / HDFS. +WARC_CDX="s3a://$WARC_CDX_BUCKET/$CRAWL/cdx/segments/*/*/*.cdx.gz" + + +### ZipNum definitions +ZIPNUM_N_LINES=3000 +ZIPNUM_N_PARTITIONS=300 + +# SPLIT_FILE could be reused from previous crawl with similar distribution of URLs, see REUSE_SPLIT_FILE +SPLIT_FILE="s3a://$WARC_CDX_BUCKET/$CRAWL/partition_boundaries.json" +# if explicitely configured +if [ -n "$REUSE_SPLIT_FILE" ]; then + echo "Reusing SPLIT_FILE $REUSE_SPLIT_FILE" + SPLIT_FILE="$REUSE_SPLIT_FILE" +fi + +# temporary output path of part-n files of the zipnum job, concatenated into the cluster.idx +ZIPNUM_TEMP_DIR="s3://$WARC_CDX_BUCKET/$CRAWL/indexes/" + +# final path to ZipNum index files +ZIPNUM_CLUSTER_DIR="s3://commoncrawl/cc-index/collections/$CRAWL/indexes/" + + +# configure S3 buffer directory +# - must exist on task/compute nodes for buffering data +# - should provide several GBs of free space to hold temporarily +# the downloaded data (WARC, WAT, WET files to be indexed), +# only relevant for the indexwarcs_cc_pyspark job. +if [ -n "$S3_LOCAL_TEMP_DIR" ]; then + S3_LOCAL_TEMP_DIR="--local_temp_dir=$S3_LOCAL_TEMP_DIR" +else + S3_LOCAL_TEMP_DIR="" +fi + + + +### PySpark definitions +export PYSPARK_PYTHON="python" # or "python3" + +# Python dependencies (for simplicity, include all Python files: cc-pyspark/*.py) +PYFILES=sparkcc.py + +### Spark configuration + +SPARK_ON_YARN="--master yarn" +SPARK_HADOOP_OPTS="" +SPARK_EXTRA_OPTS="" + +# defines SPARK_HOME, SPARK_HADOOP_OPTS and HADOOP_CONF_DIR +. spark_env.sh + +NUM_EXECUTORS=${NUM_EXECUTORS:-1} +EXECUTOR_CORES=${EXECUTOR_CORES:-2} +# input partitions for the WARC-to-CDX step +NUM_WARC_INPUT_PARTITIONS=${NUM_WARC_INPUT_PARTITIONS:-10} + +export LC_ALL=C + +set -e +set -x + + +if [ -n "$WARC_MANIFEST" ]; then + # Index WARC files in the manifest, write one CDX file per WARC + EXECUTOR_MEM=${EXECUTOR_MEM:-2g} + if [[ $NUM_WARC_INPUT_PARTITIONS -lt $((NUM_EXECUTORS*EXECUTOR_CORES)) ]]; then + echo "The number of input partitions is too low to utilize all executor cores" + exit 1 + fi + $SPARK_HOME/bin/spark-submit \ + $SPARK_ON_YARN \ + $SPARK_HADOOP_OPTS \ + --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ + --conf spark.task.maxFailures=5 \ + --conf spark.executor.memory=$EXECUTOR_MEM \ + --conf spark.driver.memory=3g \ + --conf spark.core.connection.ack.wait.timeout=600s \ + --conf spark.network.timeout=300s \ + --conf spark.shuffle.io.maxRetries=50 \ + --conf spark.shuffle.io.retryWait=600s \ + --conf spark.locality.wait=1s \ + --conf spark.executorEnv.LD_LIBRARY_PATH=/usr/lib/hadoop/lib/native \ + --num-executors $NUM_EXECUTORS \ + --executor-cores $EXECUTOR_CORES \ + --executor-memory $EXECUTOR_MEM \ + --conf spark.sql.warehouse.dir="$SPARK_SQL_WAREHOUSE" \ + --py-files $PYFILES \ + indexwarcs_cc_pyspark.py \ + --input_base_url="$WARC_PREFIX" \ + --output_base_url="$WARC_CDX_PREFIX" \ + $S3_LOCAL_TEMP_DIR \ + --num_input_partitions=$NUM_WARC_INPUT_PARTITIONS \ + --num_output_partitions=1 \ + "$WARC_MANIFEST" "$CDX_STATUS_TABLE" +fi + + +### Create ZipNum index +EXECUTOR_MEM=${EXECUTOR_MEM:-3g} + +$SPARK_HOME/bin/spark-submit \ + $SPARK_ON_YARN \ + $SPARK_HADOOP_OPTS \ + --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ + --conf spark.task.maxFailures=5 \ + --conf spark.executor.memory=$EXECUTOR_MEM \ + --conf spark.driver.memory=6g \ + --conf spark.driver.maxResultSize=4g \ + --conf spark.core.connection.ack.wait.timeout=600s \ + --conf spark.network.timeout=300s \ + --conf spark.shuffle.io.maxRetries=50 \ + --conf spark.shuffle.io.retryWait=600s \ + --conf spark.locality.wait=1s \ + --conf spark.io.compression.codec=zstd \ + --conf spark.checkpoint.compress=true \ + --conf spark.executorEnv.LD_LIBRARY_PATH=/usr/lib/hadoop/lib/native \ + --num-executors $NUM_EXECUTORS \ + --executor-cores $EXECUTOR_CORES \ + --executor-memory $EXECUTOR_MEM \ + --py-files $PYFILES \ + zipnumcluster_cc_pyspark.py \ + $S3_LOCAL_TEMP_DIR \ + --input_base_url="" \ + --output_base_url="$ZIPNUM_CLUSTER_DIR" \ + --temporary_output_base_url="$ZIPNUM_TEMP_DIR" \ + --partition_boundaries_file="$SPLIT_FILE" \ + --num_lines=$ZIPNUM_N_LINES \ + --num_output_partitions=$ZIPNUM_N_PARTITIONS \ + "$WARC_CDX" "" + + diff --git a/run_index_hadoop.sh b/run_index_hadoop.sh index a3749e5..1edb354 100755 --- a/run_index_hadoop.sh +++ b/run_index_hadoop.sh @@ -11,45 +11,62 @@ Create a Common Crawl index for a monthly crawl. All steps are run on Hadoop. s3://commoncrawl/cc-index/collections/CC-MAIN-2016-44/... list of WARC file objects to be indexed, e.g, the WARC list - s3://commoncrawl/crawl-data/CC-MAIN-2016-44/warc.paths.gz + s3://commoncrawl/crawl-data/CC-MAIN-2016-44/warc.paths + or any subset or union of multiple WARC lists (incl. robots.txt WARCs). Paths in the list must be keys/objects in the Common Crawl bucket. The path to the list must be a valid and complete HDFS or S3A URL, e.g. hdfs://hdfs-master.example.com/user/hadoop-user/CC-MAIN-2016-44.paths + The list must not be compressed to allow that the paths list is split into + multiple tasks (see mapreduce.input.lineinputformat.linespermap). + The "index warcs" step is skipped if an empty string is passed as argument. Optional split file to be reused from previous crawl with similar distribution of URLs. If not given, splits are calculated and saved on the default split file path. -Environment variables depend upon: - AWS_ACCESS_KEY_ID - AWS credentials used by Boto to access the bucket (read and write) - AWS_SECRET_ACCESS_KEY EOF exit 1 fi -if [ -z "$AWS_ACCESS_KEY_ID" ] || [ -z "$AWS_SECRET_ACCESS_KEY" ]; then - echo "AWS credentials must passed to Boto via environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY!" - exit 1 -fi - YEARWEEK="$1" WARC_MANIFEST="$2" REUSE_SPLIT_FILE="$3" -echo "Generating cc-index for $YEARWEEK" +CRAWL="CC-MAIN-$YEARWEEK" + +echo "Generating cc-index for $CRAWL" echo echo WARC_MANIFEST="$WARC_MANIFEST" echo -export WARC_CDX="s3a://commoncrawl/cc-index/cdx/CC-MAIN-$YEARWEEK/segments/*/*/*.cdx.gz" +# final path to index files +export ZIPNUM_CLUSTER_DIR="s3a://commoncrawl/cc-index/collections/$CRAWL/indexes/" -export WARC_CDX_BUCKET="commoncrawl" +# AWS S3 bucket to hold CDX files +export WARC_CDX_BUCKET="commoncrawl-index-temp" -export ZIPNUM_CLUSTER_DIR="s3a://commoncrawl/cc-index/collections/CC-MAIN-$YEARWEEK/indexes/" +# glob pattern to match all CDX files generated in step 1 (indexwarcsjob.py) +# (filesystem protocol must be supported by the used Hadoop version) +export WARC_CDX="s3a://$WARC_CDX_BUCKET/$CRAWL/cdx/segments/*/*/*.cdx.gz" +export WARC_CDX_SAMPLE="$WARC_CDX" +# if URLs are randomly distributed over WARC/CDX files, +# a sample (20-30%) is enough to determine the splits of the final CDX shards +# (simple approach: take 30% of segments) +export WARC_CDX_SAMPLE="s3a://$WARC_CDX_BUCKET/$CRAWL/cdx/segments/*[358]/*/*.cdx.gz" # SPLIT_FILE could be reused from previous crawl with similar distribution of URLs, see REUSE_SPLIT_FILE -export SPLIT_FILE="s3a://cc-cdx-index/${YEARWEEK}_splits.seq" +export SPLIT_FILE="s3a://$WARC_CDX_BUCKET/$CRAWL/splits.seq" + +# output path of part-n files of the zipnum job, later concatenated into the cluster.idx +export ZIPNUM_OUTPUT_DIR="s3a://$WARC_CDX_BUCKET/$CRAWL/indexes/" + +# configure S3 buffer directory +if [ -n "$S3_LOCAL_TEMP_DIR" ]; then + S3_LOCAL_TEMP_DIR="--s3_local_temp_dir=$S3_LOCAL_TEMP_DIR" +else + S3_LOCAL_TEMP_DIR="" +fi export LC_ALL=C @@ -59,61 +76,80 @@ set -x if [ -n "$WARC_MANIFEST" ]; then - python indexwarcsjob.py \ - --cdx_bucket=$WARC_CDX_BUCKET \ + python3 indexwarcsjob.py \ + --cdx_bucket=$WARC_CDX_BUCKET \ --no-output \ --cleanup NONE \ --skip-existing \ - --cmdenv AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID \ - --cmdenv AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY \ -r hadoop \ - --jobconf "mapreduce.map.memory.mb=800" \ - --jobconf "mapreduce.map.java.opts=-Xmx512m" \ - $WARC_MANIFEST + --jobconf "mapreduce.map.memory.mb=1600" \ + --jobconf "mapreduce.map.java.opts=-Xmx1024m" \ + "$S3_LOCAL_TEMP_DIR" \ + "$WARC_MANIFEST" fi if [ -n "$REUSE_SPLIT_FILE" ]; then + echo "Reusing SPLIT_FILE $REUSE_SPLIT_FILE" SPLIT_FILE="$REUSE_SPLIT_FILE" else - # mapreduce.output.fileoutputformat.compress=true + # mapreduce.map.output.compress=true # must compress task output to avoid that the single reducer node fails with a full disk # anyway, it may require 60 GB of local disk space on the reducer node # mapreduce.map.memory.mb=640 # mappers read only small cdx files: minimal memory requirements - python dosample.py \ - --verbose \ + # mapreduce.reduce.memory.mb (use default) + # reducer needs enough memory to hold all data during the shuffle phase + # --jobconf "mapreduce.reduce.memory.mb=2730" \ + # --jobconf "mapreduce.reduce.java.opts=-Xmx2252m" \ + # mapreduce.output.fileoutputformat.compress=false + # must not compress output, even if this is the default, because it may not + # be readable from Python via seqfileutils.py. Alternatively, compress + # and decompress the data explicitly. + test -e splits.txt && rm splits.txt + test -e splits.seq && rm splits.seq + python3 dosample.py \ --shards=300 \ - --splitfile=$SPLIT_FILE \ - --cmdenv AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID \ - --cmdenv AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY \ - --jobconf "mapreduce.map.memory.mb=640" \ - --jobconf "mapreduce.map.java.opts=-Xmx512m" \ - --jobconf "mapreduce.reduce.memory.mb=1024" \ - --jobconf "mapreduce.reduce.java.opts=-Xmx512m" \ - --jobconf "mapreduce.output.fileoutputformat.compress=true" \ - -r hadoop $WARC_CDX - mv splits.seq $(basename s3${SPLIT_FILE#s3a}) - - if s3cmd info s3${SPLIT_FILE#s3a}; then - echo "Ok, split file was upload" + --splitfile="$SPLIT_FILE" \ + --jobconf "mapreduce.map.memory.mb=1600" \ + --jobconf "mapreduce.map.java.opts=-Xmx1024m" \ + --jobconf "mapreduce.map.output.compress=true" \ + --jobconf "mapreduce.output.fileoutputformat.compress=false" \ + -r hadoop "$WARC_CDX_SAMPLE" + + # in case, the sequence file wasn't written: + # 1. verify the content + # less splits.txt + # or (in case it's compressed) + # hadoop fs -text file:$PWD/splits.txt >splits.tmp + # less splits.tmp + # 2. convert splits.txt (or the decompressed splits.tmp) into a sequence file + # python seqfileutils.py --copyfrom splits.txt splits.seq + # python seqfileutils.py --copyfrom splits.tmp splits.seq + # 3. verify the sequence file + # hadoop fs -text file:$PWD/splits.seq | less + + mv splits.seq "${CRAWL}-splits.seq" + + if aws s3 ls "s3${SPLIT_FILE#s3a}"; then + echo "Ok, split file has been uploaded" else echo "Uploading split file ..." - s3cmd put $(basename s3${SPLIT_FILE#s3a}) s3${SPLIT_FILE#s3a} + aws s3 cp "${CRAWL}-splits.seq" "s3${SPLIT_FILE#s3a}" fi fi -python zipnumclusterjob.py \ +python3 zipnumclusterjob.py \ --shards=300 \ - --splitfile=$SPLIT_FILE \ - --output-dir="$ZIPNUM_CLUSTER_DIR" \ + --splitfile="$SPLIT_FILE" \ + --zipnum-dir="$ZIPNUM_CLUSTER_DIR" \ + --output-dir="$ZIPNUM_OUTPUT_DIR" \ --no-output \ - --cmdenv AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID \ - --cmdenv AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY \ - --jobconf "mapreduce.map.memory.mb=640" \ - --jobconf "mapreduce.map.java.opts=-Xmx512m" \ - --jobconf "mapreduce.reduce.memory.mb=1536" \ - --jobconf "mapreduce.reduce.java.opts=-Xmx1024m" \ - -r hadoop $WARC_CDX + --jobconf "mapreduce.map.memory.mb=1600" \ + --jobconf "mapreduce.map.java.opts=-Xmx1024m" \ + --jobconf "mapreduce.reduce.memory.mb=3072" \ + --jobconf "mapreduce.reduce.java.opts=-Xmx2048m" \ + --jobconf "mapreduce.fileoutputcommitter.cleanup-failures.ignored=true" \ + -r hadoop "$WARC_CDX" diff --git a/runindexwarcs.sh b/runindexwarcs.sh index b4a0ecb..f9bef87 100755 --- a/runindexwarcs.sh +++ b/runindexwarcs.sh @@ -2,11 +2,9 @@ source ./index_env.sh -python indexwarcsjob.py \ +python3 indexwarcsjob.py \ --conf-path ./mrjob.conf \ --cdx_bucket=$WARC_CDX_BUCKET \ --no-output \ ---cmdenv AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID \ ---cmdenv AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY \ -r emr $WARC_MANIFEST &> /tmp/emrrun.log & diff --git a/runsample.sh b/runsample.sh index 138e774..6aca0f7 100755 --- a/runsample.sh +++ b/runsample.sh @@ -2,11 +2,9 @@ source ./index_env.sh -python dosample.py \ +python3 dosample.py \ --shards=300 \ --splitfile=$SPLIT_FILE \ --conf-path ./mrjob.conf \ ---cmdenv AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID \ ---cmdenv AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY \ -r emr $WARC_CDX &> /tmp/emrrun.log & diff --git a/runzipcluster.sh b/runzipcluster.sh index 1e23e86..27b4682 100755 --- a/runzipcluster.sh +++ b/runzipcluster.sh @@ -2,13 +2,11 @@ source ./index_env.sh -python zipnumclusterjob.py \ +python3 zipnumclusterjob.py \ --shards=300 \ --splitfile=$SPLIT_FILE \ --output-dir="$ZIPNUM_CLUSTER_DIR" \ --no-output \ --conf-path ./mrjob.conf \ ---cmdenv AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID \ ---cmdenv AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY \ -r emr $WARC_CDX &> /tmp/emrrun2.log & diff --git a/samplecdxjob.py b/samplecdxjob.py index 9091ee7..4d2d305 100644 --- a/samplecdxjob.py +++ b/samplecdxjob.py @@ -1,15 +1,22 @@ +import logging import random from heapq import heappush, heapreplace from mrjob.job import MRJob from mrjob.protocol import RawValueProtocol +from mrjob.util import log_to_stream + + +LOG = logging.getLogger('SampleCDXJob') +log_to_stream(format="%(asctime)s %(levelname)s %(name)s: %(message)s", + name='SampleCDXJob') #============================================================================= class SampleCDXJob(MRJob): """ Sample CDX key space using reservoir sampling MR algorithm adapted: - http://had00b.blogspot.com/2013/07/random-subset-in-mapreduce.html + https://had00b.blogspot.com/2013/07/random-subset-in-mapreduce.html """ HADOOP_INPUT_FORMAT = 'org.apache.hadoop.mapred.lib.CombineTextInputFormat' @@ -24,31 +31,31 @@ class SampleCDXJob(MRJob): 'mapreduce.job.jvm.numtasks': '-1', # the output should not be compressed even if the default is to compress output, - # otherwise reading from MRJobRunner.stream_output() needs decompression on the fly + # otherwise reading from MRJobRunner.cat_output() needs decompression on the fly 'mapreduce.output.fileoutputformat.compress': 'false', 'mapreduce.job.reduces': '1' } - def configure_options(self): + def configure_args(self): """Custom command line options for indexing""" - super(SampleCDXJob, self).configure_options() - - self.add_passthrough_option('--shards', dest='shards', - type=int, - default=300, - help='Number of shards in output ' + - '(create shards-1 splits') - - self.add_passthrough_option('--scaler', dest='scaler', - type=int, - default=100, - help='Scaler for sample size: ' + - 'Sample size = shards * scaler') - - self.add_passthrough_option('--splitfile', dest='splitfile', - help='Split file output dest, ' + - 'will contain shards-1 splits') + super(SampleCDXJob, self).configure_args() + + self.add_passthru_arg('--shards', dest='shards', + type=int, + default=300, + help='Number of shards in output ' + + '(create shards-1 splits') + + self.add_passthru_arg('--scaler', dest='scaler', + type=int, + default=100, + help='Scaler for sample size: ' + + 'Sample size = shards * scaler') + + self.add_passthru_arg('--splitfile', dest='splitfile', + help='Split file output dest, ' + + 'will contain shards-1 splits') def mapper_init(self): self.N = self.options.shards * self.options.scaler diff --git a/zipnumcluster_cc_pyspark.py b/zipnumcluster_cc_pyspark.py new file mode 100644 index 0000000..4508652 --- /dev/null +++ b/zipnumcluster_cc_pyspark.py @@ -0,0 +1,303 @@ +import argparse +import json +import logging +import os +import re +import zlib + +from typing import Iterator, Tuple + +import boto3 +import botocore + + +from sparkcc import CCFileProcessorSparkJob + + +class ZipNumClusterCdx(CCFileProcessorSparkJob): + """Spark job to create a ZipNum Sharded CDX index, see + . + The index is sharded over multiple partitions (default = 300). Each partition file + is compressed using gzip, but in chunks of 3000 lines (a configurable number). + Every chunk can be read separately, a jump index allows to find the right chunk + for a given key in a binary search. + """ + + name = 'ZipNumClusterCdx' + + input_descr = """Glob pattern of input CDX files, e.g., file:///path/*/*.cdx.gz +(HDFS-compatible filesystems only: hdfs://, s3a://, file://).""" + output_descr = "Ignored but required (can be empty, no output table is produced)." + + DATA_URL_PATTERN = re.compile('^(s3|https?|file|hdfs|s3a|s3n):(?://([^/]*))?/(.*)') + + + def add_arguments(self, parser): + super(CCFileProcessorSparkJob,self).add_arguments(parser) + parser.add_argument("--output_base_url", required=True, + help="Output destination.") + parser.add_argument("--partition_boundaries_file", required=True, + help="Full path to a JSON file containing partition boundaries. " + "If specified, and does not exist, will be created, otherwise, " + "it will be used.") + parser.add_argument("--temporary_output_base_url", required=True, + help="Temporary output location for per-shard cluster indexes.") + parser.add_argument("--num_lines", type=int, required=False, + default=3000, + help="Number of lines to compress in each chunk") + parser.add_argument("--num_output_partitions", type=int, required=False, + default=300, + help="Number of partitions/shards") + # suppress help for ignored arguments + parser.add_argument("--output_format", help=argparse.SUPPRESS) + parser.add_argument("--output_compression", help=argparse.SUPPRESS) + parser.add_argument("--output_option", help=argparse.SUPPRESS) + + @staticmethod + def parse_line(line): + try: + parts = line.split(' ', 2) + if len(parts) != 3: + return None + surt_key, timestamp, json_str = parts + return ((surt_key, timestamp), json_str) + except: + return None + + @staticmethod + def get_partition_id(key: str, boundaries_data) -> int: + """Determine partition based on range boundaries""" + if not boundaries_data: + return 0 + + # Binary search to find the right partition + left = 0 + right = len(boundaries_data) + + while left < right: + mid = (left + right) // 2 + if mid == len(boundaries_data): + return mid + if key <= boundaries_data[mid]: + right = mid + else: + left = mid + 1 + + return left + + @staticmethod + def write_output_file(uri, fd, base_uri=None): + """ + Write data from stream fd to output file location defined per URI. + A static variant of CCFileProcessorSparkJob.write_output_file(...) + """ + uri_match = ZipNumClusterCdx.DATA_URL_PATTERN.match(uri) + if not uri_match and base_uri: + # relative input URI (path) and base URI defined + uri = base_uri + uri + uri_match = ZipNumClusterCdx.DATA_URL_PATTERN.match(uri) + if uri_match: + (scheme, netloc, path) = uri_match.groups() + else: + # keep local file paths as is + path = uri + scheme = 'file' + netloc = None + + if scheme in {'s3', 's3a', 's3n'}: + bucketname = netloc + output_path = path + try: + client = boto3.client('s3') + client.upload_fileobj(fd, bucketname, path) + except botocore.client.ClientError as exception: + logging.error( + 'Failed to write to S3 {}: {}'.format(output_path, exception)) + + elif scheme in {'http', 'https'}: + raise ValueError('HTTP/HTTPS output not supported') + + elif scheme == 'hdfs': + raise NotImplementedError('HDFS output not implemented') + + else: + logging.info('Writing local file {}'.format(uri)) + if scheme == 'file': + # must be an absolute path + uri = os.path.join('/', path) + else: + base_dir = os.path.abspath(os.path.dirname(__file__)) + uri = os.path.join(base_dir, uri) + os.makedirs(os.path.dirname(uri), exist_ok=True) + with open(uri, 'wb') as f: + f.write(fd.read()) + + @staticmethod + def write_partition_with_global_seq(idx: int, partition_iter: list, + records_per_partition: int, output_base_url: str): + partition_idx_file = f"idx-{idx:05d}.idx" + + # Calculate starting sequence number for this partition + start_seq = (idx * records_per_partition) + 1 if records_per_partition else 1 + + with open(partition_idx_file, 'w', encoding="utf-8") as f: + seq = start_seq + for record in partition_iter: + min_surt, _, min_surt_timestamp, filename, _, offset, length, _ = record + f.write(f"{min_surt} {min_surt_timestamp}\t{filename}\t{offset}\t{length}\t{seq}\n") + seq += 1 + + with open(partition_idx_file, 'rb') as fd: + ZipNumClusterCdx.write_output_file(partition_idx_file, fd, output_base_url) + + os.unlink(partition_idx_file) + + return [(partition_idx_file, True)] + + @staticmethod + def process_partition(partition_id: int, partition_iter: Iterator[Tuple[str, Tuple[str, str]]], + num_lines: int, output_base_url: str, temporary_output_base_url: str) \ + -> Iterator[Tuple[str, str, str, str, int, int, int, int]]: + """Process partition with chunked compression and chunk boundary tracking""" + output_filename = f"cdx-{partition_id:05d}.gz" + index_entries = [] + current_offset = 0 + chunk_size = num_lines + + current_chunk = [] + chunk_min_surt = None + chunk_max_surt = None + chunk_min_timestamp = None + + with open(output_filename, 'wb') as f: + for (surt_key, timestamp), json_data in partition_iter: + line = f"{surt_key} {timestamp} {json_data}\n" + if chunk_min_surt is None: + chunk_min_surt = surt_key + chunk_min_timestamp = timestamp + chunk_max_surt = surt_key # Will end up as max since data is sorted + current_chunk.append(line) + + if len(current_chunk) >= chunk_size: + # Compress and write chunk + chunk_data = ''.join(current_chunk).encode('utf-8') + z = zlib.compressobj(6, zlib.DEFLATED, zlib.MAX_WBITS + 16) + compressed = z.compress(chunk_data) + z.flush() + chunk_length = len(compressed) + f.write(compressed) + + # Index entry with chunk boundaries + index_entries.append(( + str(chunk_min_surt), # min surt + str(chunk_max_surt), # max surt + str(chunk_min_timestamp), # capture time + str(output_filename), # filename + int(partition_id), # explicit integer conversion + int(current_offset), # explicit integer conversion + int(chunk_length), # explicit integer conversion + int(len(current_chunk)) # number of records in chunk + )) + + current_offset += chunk_length + current_chunk = [] + chunk_min_surt = None + + # Handle final chunk + if current_chunk: + chunk_data = ''.join(current_chunk).encode('utf-8') + z = zlib.compressobj(6, zlib.DEFLATED, zlib.MAX_WBITS + 16) + compressed = z.compress(chunk_data) + z.flush() + chunk_length = len(compressed) + f.write(compressed) + + index_entries.append(( + str(chunk_min_surt), # min surt + str(chunk_max_surt), # max surt + str(chunk_min_timestamp), # capture time + str(output_filename), + int(partition_id), + int(current_offset), + int(chunk_length), + int(len(current_chunk)) + )) + current_chunk = [] + + with open(output_filename, 'rb') as fd: + ZipNumClusterCdx.write_output_file(output_filename, fd, output_base_url) + + os.unlink(output_filename) + + final_files = ZipNumClusterCdx.write_partition_with_global_seq( + partition_id, index_entries, num_lines, temporary_output_base_url) + + return final_files + + def run_job(self, session): + input_url = self.args.input_base_url + self.args.input + num_partitions = self.args.num_output_partitions + boundaries_file_uri = self.args.partition_boundaries_file + num_lines = self.args.num_lines + output_base_url = self.args.output_base_url + temporary_output_base_url = self.args.temporary_output_base_url + + rdd = session.sparkContext.textFile(input_url).map( + self.parse_line).filter(lambda x: x is not None) + + boundaries = None + self.get_logger(session).info(f"Boundaries file: {boundaries_file_uri}") + if boundaries_file_uri and self.check_for_output_file(boundaries_file_uri): + self.get_logger(session).info(f"Boundaries file found, using it: {boundaries_file_uri}") + with self.fetch_file(boundaries_file_uri) as f: + boundaries = list(map(lambda l: tuple(l), json.load(f))) + + else: + # The percentage needs to be pretty small, since the collect + # brings data back to the driver... + # 1/2 percent should be fine + samples = rdd.keys().sample(False, 0.005).collect() + samples.sort() + + # Ensure more even distribution by using quantiles + total_samples = len(samples) + boundaries = [] + for i in range(1, num_partitions): + idx = (i * total_samples) // num_partitions + if idx < len(samples): + boundaries.append(samples[idx]) + + temp_file_name = 'temp_range_boundaries.json' + with open(temp_file_name, 'w', encoding="utf-8") as f: + json.dump(boundaries, f) + + with open(temp_file_name, 'rb') as f: + self.write_output_file(boundaries_file_uri, f) + + os.unlink(temp_file_name) + + self.get_logger(session).info( + f"Boundaries file created: {boundaries_file_uri}") + + rdd = rdd.repartitionAndSortWithinPartitions( + numPartitions=num_partitions, + partitionFunc=lambda k: ZipNumClusterCdx.get_partition_id(k, boundaries)) \ + .mapPartitionsWithIndex( + lambda idx, iter: ZipNumClusterCdx.process_partition( + idx, iter, num_lines, output_base_url, temporary_output_base_url)) \ + .collect() + + # loop over the output files and concatenate them into a single final file + with open('cluster.idx', 'wb') as f: + for idx_file, _ in rdd: + with self.fetch_file(temporary_output_base_url + idx_file) as idx_fd: + for line in idx_fd: + f.write(line) + + with open('cluster.idx', 'rb') as f: + self.write_output_file('cluster.idx', f, output_base_url) + + os.unlink('cluster.idx') + + +if __name__ == "__main__": + job = ZipNumClusterCdx() + job.run() diff --git a/zipnumclusterjob.py b/zipnumclusterjob.py index 8fcfe7b..751782d 100644 --- a/zipnumclusterjob.py +++ b/zipnumclusterjob.py @@ -1,9 +1,10 @@ +import logging import shutil import sys import os import zlib -import urlparse +import urllib.parse import json from tempfile import TemporaryFile @@ -11,6 +12,12 @@ from mrjob.job import MRJob from mrjob.conf import combine_dicts from mrjob.protocol import RawProtocol, RawValueProtocol +from mrjob.util import log_to_stream + + +LOG = logging.getLogger('ZipNumClusterJob') +log_to_stream(format="%(asctime)s %(levelname)s %(name)s: %(message)s", + name='ZipNumClusterJob') #============================================================================= @@ -32,28 +39,31 @@ class ZipNumClusterJob(MRJob): 'mapreduce.job.jvm.numtasks': '-1' } - def configure_options(self): + def configure_args(self): """Custom command line options for indexing""" - super(ZipNumClusterJob, self).configure_options() + super(ZipNumClusterJob, self).configure_args() - self.add_passthrough_option('--numlines', dest='numlines', - type=int, - default=3000, - help='Number of lines per gzipped block') + self.add_passthru_arg('--numlines', dest='numlines', + type=int, + default=3000, + help='Number of lines per gzipped block') - self.add_passthrough_option('--splitfile', dest='splitfile', - help='Split file to use for CDX shard split') + self.add_passthru_arg('--splitfile', dest='splitfile', + help='Split file to use for CDX shard split') - self.add_passthrough_option('--convert', dest='convert', - action='store_true', - default=False, - help='Convert CDX through _convert_line() function') + self.add_passthru_arg('--convert', dest='convert', + action='store_true', + default=False, + help='Convert CDX through _convert_line() function') - self.add_passthrough_option('--shards', dest='shards', - type=int, - help='Num ZipNum Shards to create, ' + - '= num of entries in splits + 1' + - '= num of reducers used') + self.add_passthru_arg('--shards', dest='shards', + type=int, + help='Num ZipNum Shards to create, ' + + '= num of entries in splits + 1' + + '= num of reducers used') + + self.add_passthru_arg('--zipnum-dir', dest='zipnum_dir', + help='Upload path / directory to place zipnum CDX files') def jobconf(self): orig_jobconf = super(ZipNumClusterJob, self).jobconf() @@ -96,11 +106,7 @@ def reducer_init(self): self.part_name = 'cdx-%05d.gz' % int(self.part_num) - self.output_dir = self._get_prop(['mapreduce_output_fileoutputformat_outputdir', - 'mapred.output.dir', - 'mapred_work_output_dir']) - - assert(self.output_dir) + assert(self.options.zipnum_dir) self.gzip_temp = TemporaryFile(mode='w+b') def reducer(self, key, values): @@ -125,20 +131,29 @@ def reducer_final(self): def _do_upload(self): self.gzip_temp.flush() + self.gzip_temp.seek(0) #TODO: move to generalized put() function - if self.output_dir.startswith('s3://') or self.output_dir.startswith('s3a://'): - import boto - conn = boto.connect_s3() - parts = urlparse.urlsplit(self.output_dir) - - bucket = conn.lookup(parts.netloc) - - cdxkey = bucket.new_key(parts.path + '/' + self.part_name) - cdxkey.set_contents_from_file(self.gzip_temp, rewind=True) + if self.options.zipnum_dir.startswith('s3://') or self.options.zipnum_dir.startswith('s3a://'): + import boto3 + import botocore + boto_config = botocore.client.Config( + read_timeout=180, + retries={'max_attempts' : 20}) + s3client = boto3.client('s3', config=boto_config) + + parts = urllib.parse.urlsplit(self.options.zipnum_dir) + s3key = parts.path.strip('/') + '/' + self.part_name + s3url = parts.scheme + '://' + parts.netloc + '/' + s3key + + LOG.info('Uploading index to ' + s3url) + try: + s3client.upload_fileobj(self.gzip_temp, parts.netloc, s3key) + except botocore.client.ClientError as exception: + LOG.error('Failed to upload {}: {}'.format(s3url, exception)) + return + LOG.info('Successfully uploaded index file: ' + s3url) else: - path = os.path.join(self.output_dir, self.part_name) - - self.gzip_temp.seek(0) + path = os.path.join(self.options.zipnum_dir, self.part_name) with open(path, 'w+b') as target: shutil.copyfileobj(self.gzip_temp, target) @@ -150,7 +165,7 @@ def _write_part(self): offset = self.gzip_temp.tell() - buff = '\n'.join(self.curr_lines) + '\n' + buff = ('\n'.join(self.curr_lines) + '\n').encode('utf-8') self.curr_lines = [] buff = z.compress(buff)