From 291c9e87f001e2cc8a8bb0adf0037e6f7f4a72dd Mon Sep 17 00:00:00 2001 From: Pingu Carsti Date: Wed, 14 Aug 2024 16:17:15 +0200 Subject: [PATCH] using thredds for downloads collection --- rook/usage/downloads.py | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/rook/usage/downloads.py b/rook/usage/downloads.py index 488785b..3c3ad5c 100644 --- a/rook/usage/downloads.py +++ b/rook/usage/downloads.py @@ -1,6 +1,7 @@ import os import glob import subprocess +from concurrent.futures import ThreadPoolExecutor from datetime import datetime import ipaddress import logging @@ -96,23 +97,17 @@ def collect(self, time_start=None, time_end=None, outdir=None): return self.parse(log_files, time_start, time_end, outdir) def parse(self, log_files, time_start=None, time_end=None, outdir=None): - records = [] - # zgrep "GET /outputs/rook/.*/.*\.nc" /var/log/nginx/access.log-20240801.gz - search_pattern = ( - rf"GET {self.output_path}/.*/.*\.nc" # Match request with the output path - ) - - for log_file in log_files: + def process_file(log_file): + records = [] try: # Use zgrep to pre-filter logs based on the output path p = subprocess.run( ["zgrep", search_pattern, log_file], stdout=subprocess.PIPE, - text=True, # Automatically decode stdout to strings - check=True, # Raise CalledProcessError if the command fails + text=True, + check=True, ) lines = p.stdout.splitlines() - for line in lines: try: record = parse_record(line) @@ -121,12 +116,22 @@ def parse(self, log_files, time_start=None, time_end=None, outdir=None): continue except subprocess.CalledProcessError as e: LOGGER.error(f"Failed to process log file {log_file}: {e}") - continue + return records + + search_pattern = rf"GET {self.output_path}/.*/.*\.nc" + all_records = [] + + with ThreadPoolExecutor() as executor: + futures = [ + executor.submit(process_file, log_file) for log_file in log_files + ] + for future in futures: + all_records.extend(future.result()) - if not records: + if not all_records: raise NotFoundError("Could not find any records") - df = pd.DataFrame(records) + df = pd.DataFrame(all_records) df = df[ df["request"].str.contains(rf"{self.output_path}/.*/.*\.nc", regex=True) ]