Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CCFileProcessorSparkJob to support file-wise processing #45

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ This project provides examples how to process the Common Crawl dataset with [Apa

+ [word count](./word_count.py) (term and document frequency) in Common Crawl's extracted text (WET files)

+ [md5](./md5sum.py) Run an external command on entire files from a manifest (WARC, WET, WAT, or any other type of file.)

+ [extract links](./wat_extract_links.py) from WAT files and [construct the (host-level) web graph](./hostlinks_to_graph.py) – for further details about the web graphs see the project [cc-webgraph](https://github.com/commoncrawl/cc-webgraph)

+ work with the [columnar URL index](https://commoncrawl.org/2018/03/index-to-warc-files-and-urls-in-columnar-format/) (see also [cc-index-table](https://github.com/commoncrawl/cc-index-table) and the notes about [querying the columnar index](#querying-the-columnar-index)):
Expand Down
23 changes: 23 additions & 0 deletions md5sum.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import subprocess
import os
from sparkcc import CCFileProcessorSparkJob
from pyspark.sql.types import StructType, StructField, StringType

class MD5Sum(CCFileProcessorSparkJob):
""" MD5 sum of each file"""

name = "MD5Sum"

output_schema = StructType([
StructField("uri", StringType(), True),
StructField("md5", StringType(), True),
])

def process_file(self, uri, tempfd):
proc = subprocess.run(['md5sum', tempfd.name], capture_output=True, check=True, encoding='utf8')
digest = proc.stdout.rstrip().split()[0]
yield uri, digest

if __name__ == '__main__':
job = MD5Sum()
job.run()
260 changes: 257 additions & 3 deletions sparkcc.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import re

from io import BytesIO
from tempfile import SpooledTemporaryFile, TemporaryFile
from tempfile import SpooledTemporaryFile, TemporaryFile, NamedTemporaryFile

import boto3
import botocore
Expand Down Expand Up @@ -56,7 +56,7 @@ class CCSparkJob(object):
s3client = None

# pattern to split a data URL (<scheme>://<netloc>/<path> or <scheme>:/<path>)
data_url_pattern = re.compile('^(s3|https?|file|hdfs):(?://([^/]*))?/(.*)')
data_url_pattern = re.compile('^(s3|https?|file|hdfs|s3a|s3n):(?://([^/]*))?/(.*)')


def parse_arguments(self):
Expand Down Expand Up @@ -248,7 +248,7 @@ def fetch_warc(self, uri, base_uri=None, offset=-1, length=-1):

stream = None

if scheme == 's3':
if scheme in ['s3', 's3a', 's3n']:
bucketname = netloc
if not bucketname:
self.get_logger().error("Invalid S3 URI: " + uri)
Expand Down Expand Up @@ -621,3 +621,257 @@ def run_job(self, session):
.saveAsTable(self.args.output)

self.log_accumulators(session)


class CCFileProcessorSparkJob(CCSparkJob):
"""
A Spark job definition to process entire files from a manifest.
(as opposed to processing individual WARC records)
"""

name = 'CCFileProcessor'

def add_arguments(self, parser):
super(CCSparkJob, self).add_arguments(parser)
parser.add_argument("--output_base_uri", required=False,
default='./output',
help="Base URI to write output files to. Useful if your job uses write_output_file or check_for_output_file.")

def run_job(self, session):
input_data = session.sparkContext.textFile(self.args.input,
minPartitions=self.args.num_input_partitions)

output = input_data.mapPartitionsWithIndex(self.process_files) \
.reduceByKey(self.reduce_by_key_func)

session.createDataFrame(output, schema=self.output_schema) \
.coalesce(self.args.num_output_partitions) \
.write \
.format(self.args.output_format) \
.option("compression", self.args.output_compression) \
.options(**self.get_output_options()) \
.saveAsTable(self.args.output)

self.log_accumulators(session)

def fetch_file(self, uri, base_uri=None):
"""
Fetch file. This is a modified version of fetch_warc:
It does not currently support hdfs, but that could be added if needed.
Use NamedTemporaryFile so we can support external tools that require a file path.
"""

(scheme, netloc, path) = (None, None, None)
uri_match = self.data_url_pattern.match(uri)
if not uri_match and base_uri:
# relative input URI (path) and base URI defined
uri = base_uri + uri
uri_match = self.data_url_pattern.match(uri)

if uri_match:
(scheme, netloc, path) = uri_match.groups()
else:
# keep local file paths as is
path = uri

warctemp = None

if scheme in ['s3', 's3a', 's3n']:
bucketname = netloc
if not bucketname:
self.get_logger().error("Invalid S3 URI: " + uri)
return
if not path:
self.get_logger().error("Empty S3 path: " + uri)
return
elif path[0] == '/':
# must strip leading / in S3 path
path = path[1:]
self.get_logger().info('Reading from S3 {}'.format(uri))
# download entire file using a temporary file for buffering
warctemp = NamedTemporaryFile(mode='w+b', dir=self.args.local_temp_dir)
try:
self.get_s3_client().download_fileobj(bucketname, path, warctemp)
warctemp.flush()
warctemp.seek(0)
except botocore.client.ClientError as exception:
self.get_logger().error(
'Failed to download {}: {}'.format(uri, exception))
self.warc_input_failed.add(1)
warctemp.close()
return
elif scheme == 'http' or scheme == 'https':
headers = None
self.get_logger().info('Fetching {}'.format(uri))
response = requests.get(uri, headers=headers)

if response.ok:
# includes "HTTP 206 Partial Content" for range requests
warctemp = NamedTemporaryFile( mode='w+b',
dir=self.args.local_temp_dir)
warctemp.write(response.content)
warctemp.flush()
warctemp.seek(0)
else:
self.get_logger().error(
'Failed to download {}: {}'.format(uri, response.status_code))
return
else:
self.get_logger().info('Reading local file {}'.format(uri))
if scheme == 'file':
# must be an absolute path
uri = os.path.join('/', path)
else:
base_dir = os.path.abspath(os.path.dirname(__file__))
uri = os.path.join(base_dir, uri)
warctemp = open(uri, 'rb')

return warctemp

def process_files(self, _id, iterator):
"""Process files, calling process_file(...) for each file"""
for uri in iterator:
self.warc_input_processed.add(1)

tempfd = self.fetch_file(uri, self.args.input_base_url)
if not tempfd:
continue

for res in self.process_file(uri, tempfd):
yield res

tempfd.close()

def process_file(self, uri, tempfd):
"""Process a single file"""
raise NotImplementedError('Processing file needs to be customized')

# See if we can get to the bucket referred to in the uri.
# note: if uri is not s3, this will return True
def validate_s3_bucket_from_uri(self, uri):
"""
Validate that the bucket exists in the S3 URI
"""
if uri is None or len(uri) == 0:
return True
(scheme, netloc, path) = (None, None, None)
uri_match = self.data_url_pattern.match(uri)
if uri_match:
(scheme, netloc, path) = uri_match.groups()
if scheme in ['s3', 's3a', 's3n']:
bucketname = netloc
if not bucketname:
self.get_logger().error("Invalid S3 URI: " + uri)
return False
try:
self.get_s3_client().head_bucket(Bucket=bucketname)
except botocore.exceptions.ClientError as e:
self.get_logger().error("Failed to access bucket: " + bucketname)
return False
return True
return True


# Like fetch_warc, where we will check if a local file, file on s3, etc exists or not.
def check_for_output_file(self, uri, base_uri=None):
"""
Check if output file exists. This is a modified version of fetch_warc:
It does not currently support hdfs, but that could be added if needed.
"""
(scheme, netloc, path) = (None, None, None)
uri_match = self.data_url_pattern.match(uri)
if not uri_match and base_uri:
# relative input URI (path) and base URI defined
uri = base_uri + uri
uri_match = self.data_url_pattern.match(uri)
if uri_match:
(scheme, netloc, path) = uri_match.groups()
else:
# keep local file paths as is
path = uri
if scheme in ['s3', 's3a', 's3n']:
bucketname = netloc
if not bucketname:
self.get_logger().error("Invalid S3 URI: " + uri)
return
if not path:
self.get_logger().error("Empty S3 path: " + uri)
return
elif path[0] == '/':
# must strip leading / in S3 path
path = path[1:]
self.get_logger().info('Checking if file exists on S3 {}'.format(uri))
try:
self.get_s3_client().head_object(Bucket=bucketname, Key=path)
return True
except botocore.client.ClientError as exception:
if exception.response['Error']['Code'] == '404':
return False
self.get_logger().error(
'Failed to check if file exists on S3 {}: {}'.format(uri, exception))
return False
elif scheme == 'http' or scheme == 'https':
headers = None
self.get_logger().info('Checking if file exists {}'.format(uri))
response = requests.head(uri, headers=headers)
if response.ok:
return True
else:
self.get_logger().error(
'Failed to check if file exists {}: {}'.format(uri, response.status_code))
return False
else:
self.get_logger().info('Checking if local file exists {}'.format(uri))
if scheme == 'file':
# must be an absolute path
uri = os.path.join('/', path)
else:
base_dir = os.path.abspath(os.path.dirname(__file__))
uri = os.path.join(base_dir, uri)
return os.path.exists(uri)

# like fetch_warc, but will write a file to local, s3, or hdfs
def write_output_file(self, uri, fd, base_uri=None):
"""
Write file. This is a modified version of fetch_warc:
It does not currently support hdfs, but that could be added if needed.
"""
(scheme, netloc, path) = (None, None, None)
uri_match = self.data_url_pattern.match(uri)
if not uri_match and base_uri:
# relative input URI (path) and base URI defined
uri = base_uri + uri
uri_match = self.data_url_pattern.match(uri)
if uri_match:
(scheme, netloc, path) = uri_match.groups()
else:
# keep local file paths as is
path = uri
if scheme in ['s3', 's3a', 's3n']:
bucketname = netloc
if not bucketname:
self.get_logger().error("Invalid S3 URI: " + uri)
return
if not path:
self.get_logger().error("Empty S3 path: " + uri)
return
elif path[0] == '/':
# must strip leading / in S3 path
path = path[1:]
self.get_logger().info('Writing to S3 {}'.format(uri))
try:
self.get_s3_client().upload_fileobj(fd, bucketname, path)
except botocore.client.ClientError as exception:
self.get_logger().error(
'Failed to write to S3 {}: {}'.format(uri, exception))
else:
self.get_logger().info('Writing local file {}'.format(uri))
if scheme == 'file':
# must be an absolute path
uri = os.path.join('/', path)
else:
base_dir = os.path.abspath(os.path.dirname(__file__))
uri = os.path.join(base_dir, uri)
os.makedirs(os.path.dirname(uri), exist_ok=True)
with open(uri, 'wb') as f:
f.write(fd.read())