Skip to content

Commit

Permalink
monitoring(format): updates exports and search connection
Browse files Browse the repository at this point in the history
Signed-off-by: pamfilos <[email protected]>
  • Loading branch information
pamfilos committed Sep 3, 2024
1 parent 34c1644 commit 8a6f1c4
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 54 deletions.
2 changes: 1 addition & 1 deletion scoap3/management/commands/elastic_search_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from opensearchpy import OpenSearch
from opensearchpy.exceptions import AuthenticationException

from scoap3.management.commands.utils import check_time_unit, get_query


Expand Down
3 changes: 2 additions & 1 deletion scoap3/management/commands/import_legacy_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
env = environ.Env()
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


def convert_to_iso8601(date_string):
parsed_date = datetime.strptime(date_string, "%Y-%m-%d")

Expand Down Expand Up @@ -89,7 +90,7 @@ def handle(self, *args, **options):

es_body = {"query": {"match_all": {}}}
if options["from_date"]:
es_body = {
es_body = {
"query": {
"range": {
"_updated": {
Expand Down
129 changes: 77 additions & 52 deletions scoap3/management/commands/monitor.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import json
import os, sys
import os
import sys

import urllib3
from django.core.management.base import BaseCommand, CommandParser
from opensearchpy.exceptions import AuthenticationException

from scoap3.management.commands.elastic_search_client import (
OpenSearchClient,
AuthenticationException
)
from scoap3.management.commands.elastic_search_client import OpenSearchClient
from scoap3.management.commands.utils import (
get_timestamp_str,
get_countries_from_response,
get_countries_from_response_legacy,
get_dois_from_response,
Expand All @@ -19,6 +17,7 @@
get_new_added_files_new_scoap3,
get_publishers_from_response,
get_publishers_from_response_legacy,
get_timestamp_str,
)

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
Expand Down Expand Up @@ -63,19 +62,19 @@ def add_arguments(self, parser: CommandParser) -> None:
def handle(self, *args, **options):
es_configs = {
"legacy": {
'host': os.getenv("LEGACY_OPENSEARCH_HOST"),
'username': os.getenv("LEGACY_OPENSEARCH_USERNAME", 'CHANGEME'),
'password': os.getenv("LEGACY_OPENSEARCH_PASSWORD", 'CHANGEME'),
'port': os.getenv("LEGACY_OPENSEARCH_PORT", 443),
'index': os.getenv("LEGACY_OPENSEARCH_INDEX", 'scoap3-records-record'),
"host": os.getenv("LEGACY_OPENSEARCH_HOST"),
"username": os.getenv("LEGACY_OPENSEARCH_USERNAME", "CHANGEME"),
"password": os.getenv("LEGACY_OPENSEARCH_PASSWORD", "CHANGEME"),
"port": os.getenv("LEGACY_OPENSEARCH_PORT", 443),
"index": os.getenv("LEGACY_OPENSEARCH_INDEX", "scoap3-records-record"),
},
"new": {
'host': os.getenv("OPENSEARCH_HOST"),
'username': os.getenv("OPENSEARCH_USERNAME", 'CHANGEME'),
'password': os.getenv("OPENSEARCH_PASSWORD", 'CHANGEME'),
'port': os.getenv("OPENSEARCH_PORT", 443),
'index': os.getenv("OPENSEARCH_INDEX", 'scoap3-backend-qa-articles'),
}
"host": os.getenv("OPENSEARCH_HOST"),
"username": os.getenv("OPENSEARCH_USERNAME", "CHANGEME"),
"password": os.getenv("OPENSEARCH_PASSWORD", "CHANGEME"),
"port": os.getenv("OPENSEARCH_PORT", 443),
"index": os.getenv("OPENSEARCH_INDEX", "scoap3-backend-qa-articles"),
},
}

try:
Expand All @@ -84,39 +83,41 @@ def handle(self, *args, **options):
es_new = OpenSearchClient(**es_configs["new"])
except AuthenticationException:
self.stdout.write(
self.style.WARNING("""
self.style.WARNING(
"""
An error has occured while trying to connect to SEARCH_HOST!!!
Please check config/credentials
""")
"""
)
)
sys.exit()
except Exception:

self.stdout.write(
self.style.WARNING("""
self.style.WARNING(
"""
An error has occured while trying to run monitoring!!!
Make sure you have correctly set up the following ENV vars:
OPENSEARCH_HOST
OPENSEARCH_USERNAME
OPENSEARCH_PASSWORD
OPENSEARCH_PORT
OPENSEARCH_INDEX
OPENSEARCH_INDEX
LEGACY_OPENSEARCH_HOST
LEGACY_OPENSEARCH_USERNAME
LEGACY_OPENSEARCH_PASSWORD
LEGACY_OPENSEARCH_PORT
LEGACY_OPENSEARCH_INDEX
LEGACY_OPENSEARCH_INDEX
""")
"""
)
)
sys.exit()


if options["legacy"]:
dois_created = es_legacy.get_items(
batch_size=options["batch_size"],
Expand All @@ -142,6 +143,14 @@ def handle(self, *args, **options):
action="_created",
)

countries_updated = es_legacy.get_items(
batch_size=options["batch_size"],
gte=options["gte"],
time_unit=options["time_unit"],
parse_function=get_countries_from_response_legacy,
action="_updated",
)

mapped_dois_and_files_legacy = es_legacy.get_items(
batch_size=options["batch_size"],
gte=options["gte"],
Expand Down Expand Up @@ -174,27 +183,26 @@ def handle(self, *args, **options):
action="_updated",
)

summary = {
"created_in_legacy_but_not_in_new": list(
set(dois_created) - set(dois_created_new)
),
"created_in_new_but_not_in_legacy": list(
set(dois_created_new) - set(dois_created)
),
"updated_in_legacy_but_not_in_new": list(
set(dois_updated) - set(dois_updated_new)
),
"updated_in_new_but_not_in_legacy": list(
set(dois_updated_new) - set(dois_updated)
),
"countries_in_legacy_but_not_in_new": list(
set(countries) - set(countries_new)
),
"countries_in_new_but_not_in_legacy": list(
set(countries_new) - set(countries)
),
}

# summary = {
# "created_in_legacy_but_not_in_new": list(
# set(dois_created) - set(dois_created_new)
# ),
# "created_in_new_but_not_in_legacy": list(
# set(dois_created_new) - set(dois_created)
# ),
# "updated_in_legacy_but_not_in_new": list(
# set(dois_updated) - set(dois_updated_new)
# ),
# "updated_in_new_but_not_in_legacy": list(
# set(dois_updated_new) - set(dois_updated)
# ),
# "countries_in_legacy_but_not_in_new": list(
# set(countries) - set(countries_new)
# ),
# "countries_in_new_but_not_in_legacy": list(
# set(countries_new) - set(countries)
# ),
# }

dois_created_new = es_new.get_items(
batch_size=options["batch_size"],
Expand All @@ -220,7 +228,6 @@ def handle(self, *args, **options):
action="_created_at",
)


countries_new_updated = es_new.get_items(
batch_size=options["batch_size"],
gte=options["gte"],
Expand Down Expand Up @@ -273,10 +280,28 @@ def handle(self, *args, **options):
"countries": countries_new_updated,
"files_by_doi": mapped_dois_and_files_new_updated,
"publishers": mapped_dois_and_publishers_updated,
}
},
}

if options["legacy"]:
data_legacy = {
"created": {
"dois": dois_created,
"countries": countries,
"files_by_doi": mapped_dois_and_files_legacy,
"publishers": mapped_dois_and_publishers_created_legacy,
},
"updated": {
"dois": dois_updated,
"countries": countries_updated,
"files_by_doi": mapped_dois_and_added_files_on_update,
"publishers": mapped_dois_and_publishers_updated_legacy,
},
}

file_path = f"harvest_summary_{get_timestamp_str()}.json"
with open(file_path, "w") as file:
json.dump(data, file, indent=4)
file_path = f"harvest_summary_{get_timestamp_str()}_legacy.json"
with open(file_path, "w") as file:
json.dump(data_legacy, file, indent=4)
1 change: 1 addition & 0 deletions scoap3/management/commands/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def get_query(action, gte, time_unit):
}
}


def get_timestamp_str():
current_date = datetime.now().date()
current_date_str = current_date.strftime("%Y-%m-%d %H:%M:%S")
Expand Down

0 comments on commit 8a6f1c4

Please sign in to comment.