Skip to content

Commit

Permalink
add for preproc and lenses
Browse files Browse the repository at this point in the history
  • Loading branch information
joofio committed Nov 28, 2024
1 parent 87f07f0 commit 3a33529
Showing 1 changed file with 185 additions and 13 deletions.
198 changes: 185 additions & 13 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import json
import logging
import os
import socket
import threading
import time
from logging.handlers import RotatingFileHandler

from flask import Flask, Response
import threading
from requests_futures.sessions import FuturesSession

app = Flask(__name__)
Expand Down Expand Up @@ -102,12 +105,25 @@ def log_result(
)

metrics[metric_path] = value
if "graphite" in logger_method:
message = f"{metric_path} {value} {timestamp}\n"

# Open a socket to Graphite and send the data
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect((GRAPHITE_HOST, int(GRAPHITE_PORT)))
sock.sendall(message.encode("utf-8"))
elif "file" in logger_method:
message = f"{metric_path} {value} {timestamp}\n"

@app.route('/metrics')
print(f"Sending to Graphite: {message}", end="")


@app.route("/metrics")
def metrics_endpoint():
metrics_data = "\n".join([f"{key} {value}" for key, value in metrics.items()])
print(metrics_data)
return Response(metrics_data, mimetype='text/plain')
return Response(metrics_data, mimetype="text/plain")


# print(LENSES)
PATIENT_IDS = [
Expand Down Expand Up @@ -265,10 +281,13 @@ def prepare_requests(BUNDLES, LENSES, PATIENT_IDS, BASE_URL, method):
+ "&lenses="
+ lens
)
requests_list.append((session.post(WEBSITE_URL), bundleid, lens, pid, method))

requests_list.append(
(session.post(WEBSITE_URL), bundleid, lens, pid, method)
)

return requests_list


def process_responses(requests_list):
for future, bundleid, lens, pid, method in requests_list:
response = future.result()
Expand All @@ -282,6 +301,7 @@ def process_responses(requests_list):
pid=pid,
)


def check_website_status(response):
"""
Checks the status code of a website.
Expand All @@ -297,6 +317,7 @@ def check_website_status(response):
else:
return response.status_code, {}


def check_bundles_in_list(BASE_URL):
ENCHACED_WHITE_LIST = [
"enhanced-bundlebik-alicia",
Expand Down Expand Up @@ -410,39 +431,190 @@ def chek_all_prpcessor_with_post_data(BUNDLES, PATIENT_IDS, BASE_URL):
return 1


def log_result_preproc(
method,
logger_method=["graphite"],
timestamp=None,
bundleid=None,
language=None,
extension_count=0,
applied_extension_count=0,
):
"""
Sends a metric to Graphite.
"""
metric_path = f"""gh.preproc.{method}.{bundleid}.{language}"""
timestamp = timestamp or int(time.time())
if extension_count == 0:
value = 1
elif applied_extension_count == 0:
value = 1
else:
value = 0

message = f"{metric_path} {value} {timestamp}\n"

if "graphite" in logger_method:
# Open a socket to Graphite and send the data
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect((GRAPHITE_HOST, int(GRAPHITE_PORT)))
sock.sendall(message.encode("utf-8"))
elif "file" in logger_method:
print(f"Sending to Graphite: {message}", end="")


def test_preprocessor(BASEURL, epiid, language):
headers = {"Content-Type": "application/json", "Accept": "application/json"}

url = (
BASEURL
+ "/focusing/preprocessing/"
+ epiid
+ "?preprocessors=preprocessing-service-mvp2"
)

response = requests.post(url, headers=headers)
# print(response.text)
result = response.json()

composition = result["entry"][0]
## print(composition)
json_string = json.dumps(composition)

extension_count = json_string.count(
"https://hl7.eu/fhir/ig/gravitate-health/StructureDefinition/HtmlElementLink"
)
# print(extension_exist) # Output: ?
# print(word_in_json(composition, "composition")) # Output: True
# print(word_in_json(composition, "whites")) # Output: True
if extension_count == 0:
print(epiid, "No extension")
# file.write(epiid + ", No extension\n")
elif extension_count > 0:
print(epiid, "Has extension")

# file.write(epiid + ", " + str(extension_count) + "\n")
log_result_preproc(
method="preprocessing-service-mvp2",
language=language,
bundleid=epiid,
extension_count=extension_count,
)

return 1


def fetch_paginated_data(BASEURL):
"""
Makes a GET request to a URL, checks for a 'next' element in the response,
and continues fetching data while 'next' exists.
Args:
BASEURL (str): The base URL to fetch data from.
"""
url = f"{BASEURL}epi/api/fhir/Bundle"

while url:
print(f"Fetching data from: {url}")

# Make a GET request
try:
response = requests.get(url)
response.raise_for_status() # Raise an HTTPError for bad responses
except requests.RequestException as e:
print(f"Failed to fetch data: {e}")
break

# Parse the JSON response
data = response.json()
if "entry" not in data:
print("No 'entry' found in the response. Skipping this page.")
break

# Process each entry in the current bundle
for entry in data["entry"]:
bundle = entry.get("resource", {})
composition = bundle.get("entry", [{}])[0].get("resource", {})
id_ = bundle.get("id")
if not id_:
continue # Skip if no ID is found

language = composition.get("language")
if not language or language in ["no", "fi", "ja"]:
continue

category = composition.get("category")
if category is None:
test_preprocessor(BASEURL, id_, language)
elif category[0]["coding"][0]["code"] == "R":
test_preprocessor(BASEURL, id_, language)

# Check for the 'next' key in the response
next_url = None
for link in data.get("link", []):
if link.get("relation") == "next":
next_url = link.get("url")
break

# Update the URL or exit if no 'next' link is found
if not next_url or url == next_url:
print("No more pages. Finished processing.")
break
else:
url = next_url


def main():
# Run the Flask app in a separate thread
threading.Thread(target=lambda: app.run(host='0.0.0.0', port=5000)).start()
threading.Thread(target=lambda: app.run(host="0.0.0.0", port=5000)).start()

while True:
try:
requests_list = prepare_requests(BUNDLES, LENSES, PATIENT_IDS, BASE_URL, "preprocessperlens")
requests_list = prepare_requests(
BUNDLES, LENSES, PATIENT_IDS, BASE_URL, "preprocessperlens"
)
process_responses(requests_list)
except Exception as err:
logger.debug(f"Error on function chek_preprocessor_data -> {err}")

try:
requests_list = prepare_requests(BUNDLES, ["all"], PATIENT_IDS, BASE_URL, "alllenses")
requests_list = prepare_requests(
BUNDLES, ["all"], PATIENT_IDS, BASE_URL, "alllenses"
)
process_responses(requests_list)
except Exception as err:
logger.debug(f"Error on function chek_all_lenses_data -> {err}")

try:
requests_list = prepare_requests(BUNDLES, ["all"], PATIENT_IDS, BASE_URL, "allpreprocess")
requests_list = prepare_requests(
BUNDLES, ["all"], PATIENT_IDS, BASE_URL, "allpreprocess"
)
process_responses(requests_list)
except Exception as err:
logger.debug(f"Error on function chek_all_preprocess_data -> {err}")

try:
requests_list = prepare_requests(PREPROCBUNDLES, LENSES, PATIENT_IDS, BASE_URL, "send-preprocess")
requests_list = prepare_requests(
PREPROCBUNDLES, LENSES, PATIENT_IDS, BASE_URL, "send-preprocess"
)
process_responses(requests_list)
except Exception as err:
logger.debug(f"Error on function chek_lenses_foralreadypreprocess_data -> {err}")
logger.debug(
f"Error on function chek_lenses_foralreadypreprocess_data -> {err}"
)

try:
check_bundles_in_list(BASE_URL)
except Exception as err:
logger.debug(f"Error on function check_bundles_in_list -> {err}")

try:
fetch_paginated_data(BASE_URL)
except Exception as err:
logger.debug(f"Error on function check_bundles_in_list -> {err}")

time.sleep(3600)


if __name__ == "__main__":
main()

0 comments on commit 3a33529

Please sign in to comment.