Skip to content

Commit

Permalink
Removed types
Browse files Browse the repository at this point in the history
  • Loading branch information
ErnestaP committed Mar 4, 2024
1 parent 9f8d982 commit 861ae5d
Show file tree
Hide file tree
Showing 53 changed files with 196 additions and 241 deletions.
2 changes: 1 addition & 1 deletion dags/aps/aps_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def get_xml_file(self, doi):
response_content = request.get_response_bytes()
return response_content

def __repr__(self) -> str:
def __repr__(self):
return f"<APSApiClient base_url={self.base_url}, \
path_segments={self.path_segments}, \
parameters={self.parameters}>"
12 changes: 6 additions & 6 deletions dags/aps/aps_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
class APSParams:
def __init__(
self,
from_date: str = (date.today() - timedelta(days=1)).strftime("%Y-%m-%d"),
until_date: str = date.today().strftime("%Y-%m-%d"),
date: str = "modified",
journals: str = "",
set: str = "scoap3",
from_date= (date.today() - timedelta(days=1)).strftime("%Y-%m-%d"),
until_date= date.today().strftime("%Y-%m-%d"),
date= "modified",
journals= "",
set= "scoap3",
per_page: int = 100,
):
self.from_date = from_date
Expand All @@ -18,7 +18,7 @@ def __init__(
self.set = set
self.per_page = per_page

def get_params(self) -> dict:
def get_params(self):
params = {
"from": self.from_date,
"until": self.until_date,
Expand Down
7 changes: 3 additions & 4 deletions dags/aps/aps_pull_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from aps.aps_params import APSParams
from aps.repository import APSRepository
from aps.utils import save_file_in_s3, split_json, trigger_file_processing_DAG
from common.repository import IRepository
from common.utils import set_harvesting_interval


Expand All @@ -19,11 +18,11 @@
)
def aps_pull_api():
@task()
def set_fetching_intervals(repo: IRepository = APSRepository(), **kwargs):
def set_fetching_intervals(repo = APSRepository(), **kwargs):
return set_harvesting_interval(repo=repo, **kwargs)

@task()
def save_json_in_s3(dates: dict, repo: IRepository = APSRepository(), **kwargs):
def save_json_in_s3(dates: dict, repo = APSRepository(), **kwargs):
parameters = APSParams(
from_date=dates["from_date"],
until_date=dates["until_date"],
Expand All @@ -41,7 +40,7 @@ def save_json_in_s3(dates: dict, repo: IRepository = APSRepository(), **kwargs):
return None

@task()
def trigger_files_processing(key, repo: IRepository = APSRepository()):
def trigger_files_processing(key, repo = APSRepository()):
if key is None:
logging.warning("No new files were downloaded to s3")
return
Expand Down
4 changes: 2 additions & 2 deletions dags/aps/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def find_all(self):
files.append(file_name)
return files

def get_by_id(self, id: str):
def get_by_id(self, id):
retfile = io.BytesIO()
self.s3_bucket.download_fileobj(id, retfile)
return retfile
Expand All @@ -30,7 +30,7 @@ def find_the_last_uploaded_file_date(self):
dates = [obj.last_modified.strftime("%Y-%m-%d") for obj in objects]
return max(dates)

def save(self, key: str, obj: IO):
def save(self, key, obj):
self.s3_bucket.upload_fileobj(obj, key)

def delete_all(self):
Expand Down
2 changes: 0 additions & 2 deletions dags/aps/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ def split_json(repo, key):
data = json.loads(_file.getvalue().decode("utf-8"))["data"]
ids_and_articles = []
for article in data:
doi = article["identifiers"]["doi"]
today = datetime.now().strftime("%Y-%m-%dT%H:%M")
_id = _generate_id("APS")
ids_and_articles.append({"id": _id, "article": article})
return ids_and_articles
Expand Down
2 changes: 1 addition & 1 deletion dags/common/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from bleach.sanitizer import Cleaner


def clean_whitespace_characters(input: str):
def clean_whitespace_characters(input):
return " ".join(input.split())


Expand Down
9 changes: 4 additions & 5 deletions dags/common/enricher.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import os
import re
import xml.etree.ElementTree as ET
from typing import Dict

import backoff
import requests
Expand All @@ -11,7 +10,7 @@


class Enricher(object):
def __init__(self) -> None:
def __init__(self):
self.logger = get_logger().bind(class_name=type(self).__name__)

def _get_schema(self):
Expand All @@ -25,7 +24,7 @@ def _clean_arxiv(self, arxiv):
except AttributeError:
return None

def _get_arxiv_categories_from_response_xml(self, xml: ET.Element):
def _get_arxiv_categories_from_response_xml(self, xml):
xml_namespaces = {
"arxiv": "http://arxiv.org/schemas/atom",
"w3": "http://www.w3.org/2005/Atom",
Expand Down Expand Up @@ -102,13 +101,13 @@ def _get_arxiv_categories(self, arxiv_id=None, title=None, doi=None):
response.raise_for_status()
return categories

def _set_categories(self, eprint: Dict):
def _set_categories(self, eprint):
if eprint["value"]:
eprint["categories"] = self._get_arxiv_categories(eprint["value"])
eprint["value"] = self._clean_arxiv(eprint["value"])
return eprint

def __call__(self, article: Dict):
def __call__(self, article):
enriched_article = article.copy()
enriched_article.update(
{
Expand Down
2 changes: 1 addition & 1 deletion dags/common/parsing/extractor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
class IExtractor:
def __init__(self, destination) -> None:
def __init__(self, destination):
self.destination = destination

def extract(self, root):
Expand Down
4 changes: 2 additions & 2 deletions dags/common/parsing/json_extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def __init__(
self.required = required
self.extra_function = extra_function

def extract(self, article: dict):
def extract(self, article):
return self.extra_function(get_value(article, self.json_path, default=""))


Expand All @@ -26,5 +26,5 @@ def __init__(self, destination, extraction_function) -> None:
self.destination = destination
self.extraction_function = extraction_function

def extract(self, article: dict):
def extract(self, article):
return self.extraction_function(article)
11 changes: 5 additions & 6 deletions dags/common/parsing/parser.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import xml.etree.ElementTree as ET
from typing import Dict, List

from common.cleanup import (
Expand Down Expand Up @@ -31,10 +30,10 @@ def pipe_functions(functions, value):
class IParser:
extractors: List[IExtractor]

def __init__(self, extractors) -> None:
def __init__(self, extractors):
self.extractors = extractors

def _publisher_specific_parsing(self, article: ET.Element):
def _publisher_specific_parsing(self, article):
extracted_value = {
extractor.destination: value
for extractor in self.extractors
Expand Down Expand Up @@ -93,17 +92,17 @@ def get(field, default=[]):

return parsed_article

def parse(self, article: ET.Element):
def parse(self, article):
publisher_parsed_article = self._publisher_specific_parsing(article)
return self._generic_parsing(publisher_parsed_article)


class ObjectExtractor(IParser, IExtractor):
def __init__(self, destination, extractors, extra_function=lambda x: x) -> None:
def __init__(self, destination, extractors, extra_function=lambda x: x):
super().__init__(destination)
self.destination = destination
self.extractors = extractors
self.extra_function = extra_function

def extract(self, article: ET.Element):
def extract(self, article):
return self.extra_function(super()._publisher_specific_parsing(article))
16 changes: 8 additions & 8 deletions dags/common/parsing/xml_extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def __init__(
extra_function=lambda s: s,
prefixes=None,
all_content_between_tags=False,
) -> None:
):
super().__init__(destination)

self.destination = destination
Expand Down Expand Up @@ -57,7 +57,7 @@ def _process_text_with_extra_function(self, text):
except Exception:
self.logger.error("Error in extra function with value", text=text)

def extract(self, article: ET.Element):
def extract(self, article):
if self.prefixes:
node_with_prefix = self.extra_function(
article.find(self.source, self.prefixes).text
Expand Down Expand Up @@ -86,7 +86,7 @@ def __init__(
default_value=None,
required=False,
extra_function=lambda x: x,
) -> None:
) :
super().__init__(destination)
self.destination = destination
self.source = source
Expand All @@ -111,7 +111,7 @@ def _process_attribute_with_extra_function(self, attribute):
"Error in extra function with value", attribute=attribute
)

def extract(self, article: ET.Element):
def extract(self, article):
node = article.find(self.source)
value = self._get_attribute_value(node)
processed_value = self._process_attribute_with_extra_function(value)
Expand All @@ -126,14 +126,14 @@ def extract(self, article: ET.Element):
class CustomExtractor(IExtractor):
def __init__(
self, destination, extraction_function, required=False, default_value=None
) -> None:
) :
super().__init__(destination)
self.destination = destination
self.extraction_function = extraction_function
self.default_value = default_value
self.required = required

def extract(self, article: ET.Element):
def extract(self, article):
value = self.extraction_function(article)
if check_value(value):
return value
Expand All @@ -148,13 +148,13 @@ def __init__(
destination,
value,
required=False,
) -> None:
) :
super().__init__(destination)
self.destination = destination
self.required = required
self.value = value

def extract(self, article: ET.Element):
def extract(self, article):
if not self.value and self.required:
raise RequiredFieldNotFoundExtractionError(self.destination)
return self.value
Loading

0 comments on commit 861ae5d

Please sign in to comment.