Skip to content

Commit

Permalink
Merge pull request #283 from cta-observatory/fix_error_on_empty
Browse files Browse the repository at this point in the history
Fix misleading error on opening empty file
  • Loading branch information
maxnoe authored Oct 17, 2024
2 parents 03ae8e3 + fef02f4 commit c953b2a
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 13 deletions.
29 changes: 17 additions & 12 deletions src/eventio/file_types.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,39 @@
import gzip
try:
import zstandard as zstd
has_zstd = True
except ImportError:
has_zstd = False
except ModuleNotFoundError:
zstd = None

from .constants import (
SYNC_MARKER_SIZE,
SYNC_MARKER_LITTLE_ENDIAN,
SYNC_MARKER_BIG_ENDIAN,
)

ZSTD_MARKER = b'\x28\xb5\x2f\xfd'
GZIP_MARKER = b'\x1f\x8b'


def _check_marker(path, marker):
with open(path, 'rb') as f:
marker_bytes = f.read(len(marker))

if len(marker_bytes) < len(marker):
return False

return marker_bytes == marker

def is_gzip(path):
'''Test if a file is gzipped by reading its first two bytes and compare
to the gzip marker bytes.
'''
with open(path, 'rb') as f:
marker_bytes = f.read(2)

return marker_bytes[0] == 0x1f and marker_bytes[1] == 0x8b
return _check_marker(path, GZIP_MARKER)


def is_zstd(path):
'''Test if a file is compressed using zstd using its magic marker bytes
'''
with open(path, 'rb') as f:
marker_bytes = f.read(4)

return marker_bytes == b'\x28\xb5\x2f\xfd'
return _check_marker(path, ZSTD_MARKER)


def is_eventio(path):
Expand All @@ -39,7 +44,7 @@ def is_eventio(path):
with gzip.open(path, 'rb') as f:
marker_bytes = f.read(SYNC_MARKER_SIZE)
elif is_zstd(path):
if not has_zstd:
if zstd is None:
raise IOError('You need the `zstandard` module to read zstd files')
with open(path, 'rb') as f:
cctx = zstd.ZstdDecompressor()
Expand Down
10 changes: 9 additions & 1 deletion tests/test_open_file.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import eventio
from os import path
from itertools import zip_longest
import eventio
import pytest


def test_is_install_folder_a_directory():
Expand All @@ -19,6 +20,13 @@ def test_file_is_iterable():
for event in f:
pass

def test_empty(tmp_path):
path = tmp_path / "empty.dat"
path.write_bytes(b"")

with pytest.raises(ValueError, match="^File .* is not an eventio file$"):
eventio.EventIOFile(path)


def test_file_has_objects_at_expected_position():
expected = [
Expand Down

0 comments on commit c953b2a

Please sign in to comment.