Skip to content

Commit

Permalink
refactoring.
Browse files Browse the repository at this point in the history
  • Loading branch information
Xiao-zhen-Liu committed Jan 29, 2025
1 parent 3717520 commit 2231b9e
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 29 deletions.
2 changes: 1 addition & 1 deletion core/amber/src/main/python/.flake8
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ extend-ignore =
E203,
exclude =
proto,
max-complexity = 11
max-complexity = 10
max-line-length = 88
33 changes: 17 additions & 16 deletions core/amber/src/main/python/core/storage/iceberg/iceberg_document.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,25 +170,11 @@ def _seek_to_usable_file(self) -> Iterator[FileScanTask]:
# self.table.inspect.entries() does not work with java files, need
# to implement the logic
# to find file_sequence_number for each data file ourselves
file_sequence_map = {}
current_snapshot = self.table.current_snapshot()
if current_snapshot is None:
return iter([])
for manifest in current_snapshot.manifests(self.table.io):
for entry in manifest.fetch_manifest_entry(io=self.table.io):
file_sequence_map[entry.data_file.file_path] = (
entry.sequence_number
)

# Retrieve and sort the file scan tasks by file sequence number
file_scan_tasks = list(self.table.scan().plan_files())
# Sort files by their sequence number. Files without a sequence
# number will be read last.
sorted_file_scan_tasks = sorted(
file_scan_tasks,
key=lambda t: file_sequence_map.get(
t.file.file_path, float("inf")
),
sorted_file_scan_tasks = self._extract_sorted_file_scan_tasks(
current_snapshot
)
# Skip records in files before the `from_index`
for task in sorted_file_scan_tasks:
Expand All @@ -206,6 +192,21 @@ def _seek_to_usable_file(self) -> Iterator[FileScanTask]:
else:
return iter([])

def _extract_sorted_file_scan_tasks(self, current_snapshot):
file_sequence_map = {}
for manifest in current_snapshot.manifests(self.table.io):
for entry in manifest.fetch_manifest_entry(io=self.table.io):
file_sequence_map[entry.data_file.file_path] = entry.sequence_number
# Retrieve and sort the file scan tasks by file sequence number
file_scan_tasks = list(self.table.scan().plan_files())
# Sort files by their sequence number. Files without a sequence
# number will be read last.
sorted_file_scan_tasks = sorted(
file_scan_tasks,
key=lambda t: file_sequence_map.get(t.file.file_path, float("inf")),
)
return sorted_file_scan_tasks

def __iter__(self) -> Iterator[T]:
return self

Expand Down
29 changes: 17 additions & 12 deletions core/amber/src/main/python/core/storage/vfs_uri_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ def decode_uri(
) -> (
WorkflowIdentity,
ExecutionIdentity,
Optional[OperatorIdentity],
PortIdentity,
OperatorIdentity,
Optional[PortIdentity],
VFSResourceType,
):
parsed_uri = urlparse(uri)
Expand All @@ -46,7 +46,18 @@ def extract_value(key: str) -> str:
execution_id = ExecutionIdentity(int(extract_value("eid")))
operator_id = OperatorIdentity(extract_value("opid"))

port_identity = None
port_identity = VFSURIFactory._extract_optional_port_identity(segments, uri)

resource_type_str = segments[-1].lower()
try:
resource_type = VFSResourceType(resource_type_str)
except ValueError:
raise ValueError(f"Unknown resource type: {resource_type_str}")

return workflow_id, execution_id, operator_id, port_identity, resource_type

@staticmethod
def _extract_optional_port_identity(segments, uri):
if "pid" in segments:
try:
pid_index = segments.index("pid")
Expand All @@ -55,17 +66,11 @@ def extract_value(key: str) -> str:
if port_type != "I" and port_type != "E":
raise ValueError(f"Invalid port type: {port_type} in URI: {uri}")
is_internal = port_type == "I"
port_identity = PortIdentity(port_id, is_internal)
return PortIdentity(port_id, is_internal)
except (ValueError, IndexError):
raise ValueError(f"Invalid port information in URI: {uri}")

resource_type_str = segments[-1].lower()
try:
resource_type = VFSResourceType(resource_type_str)
except ValueError:
raise ValueError(f"Unknown resource type: {resource_type_str}")

return workflow_id, execution_id, operator_id, port_identity, resource_type
else:
return None

@staticmethod
def create_result_uri(workflow_id, execution_id, operator_id, port_identity) -> str:
Expand Down

0 comments on commit 2231b9e

Please sign in to comment.