Skip to content

Commit

Permalink
using thredds for downloads collection
Browse files Browse the repository at this point in the history
  • Loading branch information
cehbrecht committed Aug 14, 2024
1 parent 0ddf742 commit 291c9e8
Showing 1 changed file with 18 additions and 13 deletions.
31 changes: 18 additions & 13 deletions rook/usage/downloads.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import glob
import subprocess
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
import ipaddress
import logging
Expand Down Expand Up @@ -96,23 +97,17 @@ def collect(self, time_start=None, time_end=None, outdir=None):
return self.parse(log_files, time_start, time_end, outdir)

def parse(self, log_files, time_start=None, time_end=None, outdir=None):
records = []
# zgrep "GET /outputs/rook/.*/.*\.nc" /var/log/nginx/access.log-20240801.gz
search_pattern = (
rf"GET {self.output_path}/.*/.*\.nc" # Match request with the output path
)

for log_file in log_files:
def process_file(log_file):
records = []
try:
# Use zgrep to pre-filter logs based on the output path
p = subprocess.run(
["zgrep", search_pattern, log_file],
stdout=subprocess.PIPE,
text=True, # Automatically decode stdout to strings
check=True, # Raise CalledProcessError if the command fails
text=True,
check=True,
)
lines = p.stdout.splitlines()

for line in lines:
try:
record = parse_record(line)
Expand All @@ -121,12 +116,22 @@ def parse(self, log_files, time_start=None, time_end=None, outdir=None):
continue
except subprocess.CalledProcessError as e:
LOGGER.error(f"Failed to process log file {log_file}: {e}")
continue
return records

search_pattern = rf"GET {self.output_path}/.*/.*\.nc"
all_records = []

with ThreadPoolExecutor() as executor:
futures = [
executor.submit(process_file, log_file) for log_file in log_files
]
for future in futures:
all_records.extend(future.result())

if not records:
if not all_records:
raise NotFoundError("Could not find any records")

df = pd.DataFrame(records)
df = pd.DataFrame(all_records)
df = df[
df["request"].str.contains(rf"{self.output_path}/.*/.*\.nc", regex=True)
]
Expand Down

0 comments on commit 291c9e8

Please sign in to comment.