-
Notifications
You must be signed in to change notification settings - Fork 38
/
Copy pathput_crossref_in_s3.py
117 lines (89 loc) · 3.8 KB
/
put_crossref_in_s3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import argparse
import datetime
import json
import os
import pytz
import time
import boto3
import requests
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from app import logging
CROSSREF_API_KEY = os.getenv('CROSSREF_API_KEY')
@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception_type(requests.exceptions.RequestException))
def make_request_with_retry(url, headers):
response = requests.get(url, headers=headers)
if response.status_code == 429:
retry_after = int(response.headers.get('Retry-After', 60))
logging.warning(f"Rate limit exceeded (429). Retrying after {retry_after} seconds.")
time.sleep(retry_after)
response.raise_for_status()
elif response.status_code >= 500:
logging.error(f"Server error {response.status_code} for URL {url}. Retrying...")
response.raise_for_status()
return response
def get_crossref_data(filter_params, s3_bucket, s3_prefix):
base_url = 'https://api.crossref.org/works'
headers = {
"Accept": "application/json",
"User-Agent": "mailto:[email protected]",
"crossref-api-key": CROSSREF_API_KEY
}
per_page = 500
cursor = '*'
page_number = 1
has_more_pages = True
url_template = f"{base_url}?filter={{filter}}&rows={{rows}}&cursor={{cursor}}"
while has_more_pages:
url = url_template.format(
filter=filter_params,
rows=per_page,
cursor=cursor
)
response = make_request_with_retry(url, headers)
logging.info(f"Requesting page {page_number} from URL {url}.")
data = response.json()
items = data['message']['items']
if items:
current_timestamp = datetime.datetime.now().isoformat()
s3_key = f'{s3_prefix}/works_page_{page_number}_{current_timestamp}.json'
save_to_s3(items, s3_bucket, s3_key)
else:
logging.info(f"No more items to fetch on page {page_number}. Ending pagination.")
has_more_pages = False
if 'next-cursor' not in data['message']:
logging.info("No next cursor found, pagination complete.")
has_more_pages = False
cursor = data['message']['next-cursor']
page_number += 1
time.sleep(.5)
def save_to_s3(json_data, s3_bucket, s3_key):
logging.info(f"Saving crossref works to S3 bucket {s3_bucket} with key {s3_key}.")
s3 = boto3.client('s3')
s3.put_object(
Bucket=s3_bucket,
Key=s3_key,
Body=json.dumps(json_data),
ContentType='application/json'
)
def main():
parser = argparse.ArgumentParser(description='Pull Crossref data (new works or updates).')
parser.add_argument('mode', choices=['new', 'updates_two_days_ago', 'updates'], help='Specify whether to pull new works or updates.')
args = parser.parse_args()
s3_bucket = 'openalex-sandbox'
now = datetime.datetime.now(pytz.utc)
today_str = now.strftime('%Y-%m-%d')
yesterday = now - datetime.timedelta(days=1)
yesterday_str = yesterday.strftime('%Y-%m-%d')
two_days_ago = now - datetime.timedelta(days=2)
two_days_ago_str = two_days_ago.strftime('%Y-%m-%d')
if args.mode == 'new':
filter_params = f'from-created-date:{yesterday_str},until-created-date:{today_str}'
s3_prefix = f'openalex-elt/crossref/new-works/{now.strftime("%Y/%m/%d/%H")}'
get_crossref_data(filter_params, s3_bucket, s3_prefix)
elif args.mode == 'updates':
filter_params = f'from-index-date:{two_days_ago_str},until-index-date:{yesterday_str}'
s3_prefix = f'openalex-elt/crossref/updates/{yesterday.strftime("%Y/%m/%d")}'
get_crossref_data(filter_params, s3_bucket, s3_prefix)
if __name__ == '__main__':
main()