-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
1 changed file
with
89 additions
and
115 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,164 +1,138 @@ | ||
import subprocess | ||
from datetime import datetime | ||
import gzip | ||
import os | ||
import glob | ||
from urllib.parse import urlparse | ||
import subprocess | ||
from datetime import datetime | ||
import ipaddress | ||
import pandas as pd | ||
import logging | ||
from urllib.parse import urlparse | ||
|
||
import pandas as pd | ||
from pywps import configuration as config | ||
|
||
from .base import Usage | ||
|
||
import logging | ||
|
||
LOGGER = logging.getLogger() | ||
|
||
|
||
class NotFoundError(Exception): | ||
"""Value not found Exception""" | ||
class NotFoundError(ValueError): | ||
"""Raised when a log entry is not found or is invalid.""" | ||
|
||
pass | ||
|
||
|
||
class AddressValueError(Exception): | ||
"""IP address value error""" | ||
class AddressValueError(ValueError): | ||
"""Raised when an IP address cannot be parsed.""" | ||
|
||
pass | ||
|
||
|
||
def dot2longip(ip): | ||
"""Converts an IPv4 address to an IP number""" | ||
|
||
ip_number = 0 | ||
|
||
"""Converts an IPv4 address to an IP number.""" | ||
try: | ||
parsed_ip = ipaddress.IPv4Address(ip) | ||
ip_number = int(parsed_ip) | ||
except (AddressValueError, ipaddress.AddressValueError): | ||
ip_number = 0 | ||
msg = ( | ||
"Could not convert this IP address to an IP number: {}." "Skipping..." | ||
).format(ip) | ||
LOGGER.debug(msg) | ||
|
||
return ip_number | ||
return int(ipaddress.IPv4Address(ip)) | ||
except ipaddress.AddressValueError: | ||
LOGGER.debug(f"Could not convert IP address to an IP number: {ip}. Skipping...") | ||
return 0 | ||
|
||
|
||
def parse_record(line): | ||
""" | ||
Parse a log record. | ||
:param line: access log record line | ||
:returns: dict with log record | ||
""" | ||
# result record | ||
record = dict() | ||
# raw line / record | ||
_line = line.strip() | ||
# remote host (IP) | ||
record["remote_host_ip"] = None | ||
# converted remote host IP address to IP number | ||
record["ip_number"] = None | ||
# datetime.datetime object of request | ||
record["datetime"] = None | ||
# timezone request | ||
record["timezone"] = None | ||
# type of HTTP request (GET, POST, etc.) | ||
record["request_type"] = None | ||
# HTTP request | ||
record["request"] = None | ||
# protocol and version of request | ||
record["protocol"] = None | ||
# HTTP response status code | ||
record["status_code"] = 0 | ||
# size of HTTP response | ||
record["size"] = 0 | ||
# Referer | ||
record["referer"] = None | ||
# User-Agent | ||
record["user_agent"] = None | ||
|
||
LOGGER.debug("Parsing log record") | ||
tokens = _line.split() | ||
|
||
if len(tokens) < 12: | ||
msg = "Line does not contain expected apache record format" | ||
LOGGER.warning(msg) | ||
raise NotFoundError(msg) | ||
|
||
# validate IP address | ||
record["ip_number"] = dot2longip(tokens[0]) | ||
if record["ip_number"] != 0: | ||
record["remote_host_ip"] = tokens[0] | ||
"""Parse a log record into a dictionary.""" | ||
tokens = line.strip().split() | ||
MIN_EXPECTED_TOKENS = 12 | ||
|
||
try: | ||
record["datetime"] = datetime.strptime( | ||
tokens[3].lstrip("["), "%d/%b/%Y:%H:%M:%S" | ||
) | ||
except ValueError: | ||
msg = ( | ||
"Datetime token ({}) that does not match the expected " "datetime format" | ||
).format(tokens[3].lstrip("[")) | ||
LOGGER.warning(msg) | ||
raise NotFoundError(msg) | ||
if len(tokens) < MIN_EXPECTED_TOKENS: | ||
LOGGER.warning("Line does not contain the expected apache record format") | ||
raise NotFoundError("Invalid log line format") | ||
|
||
record["timezone"] = tokens[4].rstrip("]") | ||
record["request_type"] = tokens[5].lstrip('"') | ||
record["request"] = tokens[6] | ||
record["protocol"] = tokens[7].rstrip('"') | ||
ip_number = dot2longip(tokens[0]) | ||
if ip_number == 0: | ||
raise NotFoundError("Invalid IP address") | ||
|
||
try: | ||
record["status_code"] = int(tokens[8]) | ||
if tokens[9] != "-": # ignore size values that are "-" | ||
record["size"] = int(tokens[9]) | ||
record_time = datetime.strptime(tokens[3].lstrip("["), "%d/%b/%Y:%H:%M:%S") | ||
except ValueError: | ||
msg = ( | ||
"Status code ({}) or size ({}) are invalid literals for " "int type" | ||
).format(tokens[8], tokens[9]) | ||
LOGGER.warning(msg) | ||
raise NotFoundError(msg) | ||
LOGGER.warning(f"Invalid datetime format: {tokens[3].lstrip('[')}") | ||
raise NotFoundError("Invalid datetime format") | ||
|
||
record["referer"] = tokens[10].replace('"', "") | ||
record["user_agent"] = " ".join(tokens[11:]).lstrip('"').rstrip('"') | ||
return record | ||
try: | ||
status_code = int(tokens[8]) | ||
size = int(tokens[9]) if tokens[9] != "-" else 0 | ||
except ValueError: | ||
LOGGER.warning(f"Invalid status code or size: {tokens[8]}, {tokens[9]}") | ||
raise NotFoundError("Invalid status code or size") | ||
|
||
return { | ||
"remote_host_ip": tokens[0], | ||
"ip_number": ip_number, | ||
"datetime": record_time, | ||
"timezone": tokens[4].rstrip("]"), | ||
"request_type": tokens[5].lstrip('"'), | ||
"request": tokens[6], | ||
"protocol": tokens[7].rstrip('"'), | ||
"status_code": status_code, | ||
"size": size, | ||
"referer": tokens[10].replace('"', ""), | ||
"user_agent": " ".join(tokens[11:]).strip('"'), | ||
} | ||
|
||
|
||
class Downloads(Usage): | ||
def __init__(self): | ||
self.output_path = urlparse(config.get_config_value("server", "outputurl")).path | ||
self.http_log_path = config.get_config_value("logging", "http_log_path") | ||
self._output_path = urlparse( | ||
config.get_config_value("server", "outputurl") | ||
).path | ||
self._http_log_path = config.get_config_value("logging", "http_log_path") | ||
|
||
@property | ||
def output_path(self): | ||
return self._output_path | ||
|
||
@property | ||
def http_log_path(self): | ||
return self._http_log_path | ||
|
||
def collect(self, time_start=None, time_end=None, outdir=None): | ||
log_files = sorted(glob.glob(os.path.join(self.http_log_path, "access.log*"))) | ||
return self.parse(log_files, time_start, time_end, outdir) | ||
|
||
def parse(self, log_files, time_start=None, time_end=None, outdir=None): | ||
records = [] | ||
for f in log_files: | ||
p = subprocess.run(["zgrep", self.output_path, f], stdout=subprocess.PIPE) | ||
lines = p.stdout.split(b"\n") | ||
for line in lines: | ||
try: | ||
record = parse_record(line.decode()) | ||
except NotFoundError: | ||
continue | ||
if not record["request"].startswith(self.output_path): | ||
continue | ||
records.append(record) | ||
search_pattern = f'"{self.output_path}' # Match request with the output path | ||
|
||
for log_file in log_files: | ||
try: | ||
# Use zgrep to pre-filter logs based on the output path | ||
p = subprocess.run( | ||
["zgrep", search_pattern, log_file], | ||
stdout=subprocess.PIPE, | ||
text=True, # Automatically decode stdout to strings | ||
check=True, # Raise CalledProcessError if the command fails | ||
) | ||
lines = p.stdout.splitlines() | ||
|
||
for line in lines: | ||
try: | ||
record = parse_record(line) | ||
records.append(record) | ||
except NotFoundError: | ||
continue | ||
except subprocess.CalledProcessError as e: | ||
LOGGER.error(f"Failed to process log file {log_file}: {e}") | ||
continue | ||
|
||
if not records: | ||
raise NotFoundError("could not find any records") | ||
raise NotFoundError("Could not find any records") | ||
|
||
df = pd.DataFrame(records) | ||
df = df.loc[ | ||
df = df[ | ||
df["request"].str.contains(rf"{self.output_path}/.*/.*\.nc", regex=True) | ||
] # noqa | ||
# df['datetime'] = pd.to_datetime(df['datetime']) | ||
] | ||
|
||
if time_start: | ||
df = df.loc[df["datetime"] >= time_start] | ||
df = df[df["datetime"] >= time_start] | ||
if time_end: | ||
df = df.loc[df["datetime"] <= time_end] | ||
df = df[df["datetime"] <= time_end] | ||
|
||
fname = os.path.join(outdir, "downloads.csv") | ||
df.to_csv(fname, index=False) | ||
return fname |