diff --git a/README.md b/README.md index 9a4c110..0d71fc8 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,8 @@ This project provides examples how to process the Common Crawl dataset with [Apa + [word count](./word_count.py) (term and document frequency) in Common Crawl's extracted text (WET files) ++ [md5](./md5sum.py) Run an external command on entire files from a manifest (WARC, WET, WAT, or any other type of file.) + + [extract links](./wat_extract_links.py) from WAT files and [construct the (host-level) web graph](./hostlinks_to_graph.py) – for further details about the web graphs see the project [cc-webgraph](https://github.com/commoncrawl/cc-webgraph) + work with the [columnar URL index](https://commoncrawl.org/2018/03/index-to-warc-files-and-urls-in-columnar-format/) (see also [cc-index-table](https://github.com/commoncrawl/cc-index-table) and the notes about [querying the columnar index](#querying-the-columnar-index)): diff --git a/md5sum.py b/md5sum.py new file mode 100644 index 0000000..88fe15e --- /dev/null +++ b/md5sum.py @@ -0,0 +1,23 @@ +import subprocess +import os +from sparkcc import CCFileProcessorSparkJob +from pyspark.sql.types import StructType, StructField, StringType + +class MD5Sum(CCFileProcessorSparkJob): + """ MD5 sum of each file""" + + name = "MD5Sum" + + output_schema = StructType([ + StructField("uri", StringType(), True), + StructField("md5", StringType(), True), + ]) + + def process_file(self, uri, tempfd): + proc = subprocess.run(['md5sum', tempfd.name], capture_output=True, check=True, encoding='utf8') + digest = proc.stdout.rstrip().split()[0] + yield uri, digest + +if __name__ == '__main__': + job = MD5Sum() + job.run() diff --git a/sparkcc.py b/sparkcc.py index 6a99ec9..344c7a6 100644 --- a/sparkcc.py +++ b/sparkcc.py @@ -5,7 +5,7 @@ import re from io import BytesIO -from tempfile import SpooledTemporaryFile, TemporaryFile +from tempfile import SpooledTemporaryFile, TemporaryFile, NamedTemporaryFile import boto3 import botocore @@ -56,7 +56,7 @@ class CCSparkJob(object): s3client = None # pattern to split a data URL (:/// or :/) - data_url_pattern = re.compile('^(s3|https?|file|hdfs):(?://([^/]*))?/(.*)') + data_url_pattern = re.compile('^(s3|https?|file|hdfs|s3a|s3n):(?://([^/]*))?/(.*)') def parse_arguments(self): @@ -248,7 +248,7 @@ def fetch_warc(self, uri, base_uri=None, offset=-1, length=-1): stream = None - if scheme == 's3': + if scheme in ['s3', 's3a', 's3n']: bucketname = netloc if not bucketname: self.get_logger().error("Invalid S3 URI: " + uri) @@ -621,3 +621,257 @@ def run_job(self, session): .saveAsTable(self.args.output) self.log_accumulators(session) + + +class CCFileProcessorSparkJob(CCSparkJob): + """ + A Spark job definition to process entire files from a manifest. + (as opposed to processing individual WARC records) + """ + + name = 'CCFileProcessor' + + def add_arguments(self, parser): + super(CCSparkJob, self).add_arguments(parser) + parser.add_argument("--output_base_uri", required=False, + default='./output', + help="Base URI to write output files to. Useful if your job uses write_output_file or check_for_output_file.") + + def run_job(self, session): + input_data = session.sparkContext.textFile(self.args.input, + minPartitions=self.args.num_input_partitions) + + output = input_data.mapPartitionsWithIndex(self.process_files) \ + .reduceByKey(self.reduce_by_key_func) + + session.createDataFrame(output, schema=self.output_schema) \ + .coalesce(self.args.num_output_partitions) \ + .write \ + .format(self.args.output_format) \ + .option("compression", self.args.output_compression) \ + .options(**self.get_output_options()) \ + .saveAsTable(self.args.output) + + self.log_accumulators(session) + + def fetch_file(self, uri, base_uri=None): + """ + Fetch file. This is a modified version of fetch_warc: + It does not currently support hdfs, but that could be added if needed. + Use NamedTemporaryFile so we can support external tools that require a file path. + """ + + (scheme, netloc, path) = (None, None, None) + uri_match = self.data_url_pattern.match(uri) + if not uri_match and base_uri: + # relative input URI (path) and base URI defined + uri = base_uri + uri + uri_match = self.data_url_pattern.match(uri) + + if uri_match: + (scheme, netloc, path) = uri_match.groups() + else: + # keep local file paths as is + path = uri + + warctemp = None + + if scheme in ['s3', 's3a', 's3n']: + bucketname = netloc + if not bucketname: + self.get_logger().error("Invalid S3 URI: " + uri) + return + if not path: + self.get_logger().error("Empty S3 path: " + uri) + return + elif path[0] == '/': + # must strip leading / in S3 path + path = path[1:] + self.get_logger().info('Reading from S3 {}'.format(uri)) + # download entire file using a temporary file for buffering + warctemp = NamedTemporaryFile(mode='w+b', dir=self.args.local_temp_dir) + try: + self.get_s3_client().download_fileobj(bucketname, path, warctemp) + warctemp.flush() + warctemp.seek(0) + except botocore.client.ClientError as exception: + self.get_logger().error( + 'Failed to download {}: {}'.format(uri, exception)) + self.warc_input_failed.add(1) + warctemp.close() + return + elif scheme == 'http' or scheme == 'https': + headers = None + self.get_logger().info('Fetching {}'.format(uri)) + response = requests.get(uri, headers=headers) + + if response.ok: + # includes "HTTP 206 Partial Content" for range requests + warctemp = NamedTemporaryFile( mode='w+b', + dir=self.args.local_temp_dir) + warctemp.write(response.content) + warctemp.flush() + warctemp.seek(0) + else: + self.get_logger().error( + 'Failed to download {}: {}'.format(uri, response.status_code)) + return + else: + self.get_logger().info('Reading local file {}'.format(uri)) + if scheme == 'file': + # must be an absolute path + uri = os.path.join('/', path) + else: + base_dir = os.path.abspath(os.path.dirname(__file__)) + uri = os.path.join(base_dir, uri) + warctemp = open(uri, 'rb') + + return warctemp + + def process_files(self, _id, iterator): + """Process files, calling process_file(...) for each file""" + for uri in iterator: + self.warc_input_processed.add(1) + + tempfd = self.fetch_file(uri, self.args.input_base_url) + if not tempfd: + continue + + for res in self.process_file(uri, tempfd): + yield res + + tempfd.close() + + def process_file(self, uri, tempfd): + """Process a single file""" + raise NotImplementedError('Processing file needs to be customized') + + # See if we can get to the bucket referred to in the uri. + # note: if uri is not s3, this will return True + def validate_s3_bucket_from_uri(self, uri): + """ + Validate that the bucket exists in the S3 URI + """ + if uri is None or len(uri) == 0: + return True + (scheme, netloc, path) = (None, None, None) + uri_match = self.data_url_pattern.match(uri) + if uri_match: + (scheme, netloc, path) = uri_match.groups() + if scheme in ['s3', 's3a', 's3n']: + bucketname = netloc + if not bucketname: + self.get_logger().error("Invalid S3 URI: " + uri) + return False + try: + self.get_s3_client().head_bucket(Bucket=bucketname) + except botocore.exceptions.ClientError as e: + self.get_logger().error("Failed to access bucket: " + bucketname) + return False + return True + return True + + + # Like fetch_warc, where we will check if a local file, file on s3, etc exists or not. + def check_for_output_file(self, uri, base_uri=None): + """ + Check if output file exists. This is a modified version of fetch_warc: + It does not currently support hdfs, but that could be added if needed. + """ + (scheme, netloc, path) = (None, None, None) + uri_match = self.data_url_pattern.match(uri) + if not uri_match and base_uri: + # relative input URI (path) and base URI defined + uri = base_uri + uri + uri_match = self.data_url_pattern.match(uri) + if uri_match: + (scheme, netloc, path) = uri_match.groups() + else: + # keep local file paths as is + path = uri + if scheme in ['s3', 's3a', 's3n']: + bucketname = netloc + if not bucketname: + self.get_logger().error("Invalid S3 URI: " + uri) + return + if not path: + self.get_logger().error("Empty S3 path: " + uri) + return + elif path[0] == '/': + # must strip leading / in S3 path + path = path[1:] + self.get_logger().info('Checking if file exists on S3 {}'.format(uri)) + try: + self.get_s3_client().head_object(Bucket=bucketname, Key=path) + return True + except botocore.client.ClientError as exception: + if exception.response['Error']['Code'] == '404': + return False + self.get_logger().error( + 'Failed to check if file exists on S3 {}: {}'.format(uri, exception)) + return False + elif scheme == 'http' or scheme == 'https': + headers = None + self.get_logger().info('Checking if file exists {}'.format(uri)) + response = requests.head(uri, headers=headers) + if response.ok: + return True + else: + self.get_logger().error( + 'Failed to check if file exists {}: {}'.format(uri, response.status_code)) + return False + else: + self.get_logger().info('Checking if local file exists {}'.format(uri)) + if scheme == 'file': + # must be an absolute path + uri = os.path.join('/', path) + else: + base_dir = os.path.abspath(os.path.dirname(__file__)) + uri = os.path.join(base_dir, uri) + return os.path.exists(uri) + + # like fetch_warc, but will write a file to local, s3, or hdfs + def write_output_file(self, uri, fd, base_uri=None): + """ + Write file. This is a modified version of fetch_warc: + It does not currently support hdfs, but that could be added if needed. + """ + (scheme, netloc, path) = (None, None, None) + uri_match = self.data_url_pattern.match(uri) + if not uri_match and base_uri: + # relative input URI (path) and base URI defined + uri = base_uri + uri + uri_match = self.data_url_pattern.match(uri) + if uri_match: + (scheme, netloc, path) = uri_match.groups() + else: + # keep local file paths as is + path = uri + if scheme in ['s3', 's3a', 's3n']: + bucketname = netloc + if not bucketname: + self.get_logger().error("Invalid S3 URI: " + uri) + return + if not path: + self.get_logger().error("Empty S3 path: " + uri) + return + elif path[0] == '/': + # must strip leading / in S3 path + path = path[1:] + self.get_logger().info('Writing to S3 {}'.format(uri)) + try: + self.get_s3_client().upload_fileobj(fd, bucketname, path) + except botocore.client.ClientError as exception: + self.get_logger().error( + 'Failed to write to S3 {}: {}'.format(uri, exception)) + else: + self.get_logger().info('Writing local file {}'.format(uri)) + if scheme == 'file': + # must be an absolute path + uri = os.path.join('/', path) + else: + base_dir = os.path.abspath(os.path.dirname(__file__)) + uri = os.path.join(base_dir, uri) + os.makedirs(os.path.dirname(uri), exist_ok=True) + with open(uri, 'wb') as f: + f.write(fd.read()) \ No newline at end of file