Skip to content

Commit

Permalink
Support zipped iOS filesystems on Windows
Browse files Browse the repository at this point in the history
By unzipping to a temporary location rather than using NamedTemporaryFile.
  • Loading branch information
Nicholas FitzRoy-Dale committed Aug 5, 2024
1 parent 7dff68e commit 10a293e
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 127 deletions.
154 changes: 27 additions & 127 deletions rime/filesystem/ios.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,15 +178,15 @@ def ios_open_raw(self, path, mode):


class IosDeviceFilesystem(DeviceFilesystem, IosDeviceFilesystemBase):
def __init__(self, id_: str, root: str, metadata_db_path: str, writeable_manifest: bool = False):
def __init__(self, id_: str, root: str, metadata_db_path: str, writeable_manifest: bool = False, device_settings=None):
self.id_ = id_
self.root = root
self.manifest = sqlite3_connect_with_regex_support(
os.path.join(self.root, 'Manifest.db'),
read_only=not writeable_manifest
)
self.file_table = Table('Files')
self._settings = DeviceSettings(root)
self._settings = DeviceSettings(root) if device_settings is None else device_settings
self._converter = _IosManifest(self.manifest)
self._metadata = metadata.MetadataDb(metadata_db_path)
self._device_info = None
Expand Down Expand Up @@ -342,52 +342,21 @@ def __init__(self, id_: str, root: str, metadata_db_path: str):
# store the path of the root for other functions
# to be able to open the zipfile
self.root = root
self.zipped_filesystem = zipsupport.ZippedFilesystem(root)

# keep a reference to the temp file in the object
self.temp_manifest_pathname = zipsupport.temp_file_name()
self.temp_settings_pathname = zipsupport.temp_file_name()

with zipfile.ZipFile(self.root) as zp:
# get the main directory contained in the .zip container file
main_dir = zipsupport.get_zipfile_main_dir(zp)

with (main_dir / 'Manifest.db').open('rb') as zf, open(self.temp_manifest_pathname, 'w+b') as h:
h.write(zf.read())

with (main_dir / '_rime_settings.db').open('rb') as zf, open(self.temp_settings_pathname, 'w+b') as h:
h.write(zf.read())

self.manifest = sqlite3_connect_with_regex_support(self.temp_manifest_pathname, read_only=True)
self.file_table = Table('Files')

settings_dir, settings_file = os.path.split(self.temp_settings_pathname)
self._settings = DeviceSettings(settings_dir, settings_file)
self._converter = _IosManifest(self.manifest)
self._metadata = metadata.MetadataDb(metadata_db_path)
self._device_info = None

def __del__(self):
del self._converter

try:
self.manifest.close()
except:
pass
# Find the unzipped root, which is the single directory below 'zipped_filesystem'
for elem in os.listdir(self.zipped_filesystem.unzipped_dirname):
if os.path.isdir(os.path.join(self.zipped_filesystem.unzipped_dirname, elem)):
self.unzipped_root = os.path.join(self.zipped_filesystem.unzipped_dirname, elem)
break
else:
raise ValueError("The zipfile does not contain a single directory.")

try:
del self._settings
except:
pass

for pathname in [self.temp_manifest_pathname, self.temp_settings_pathname]:
try:
os.remove(pathname)
except:
pass
self._settings = DeviceSettings(self.unzipped_root)
self._real = IosDeviceFilesystem(id_, self.unzipped_root, metadata_db_path, device_settings=self._settings)

@classmethod
def is_device_filesystem(cls, path) -> bool:

if not zipfile.is_zipfile(path):
return False

Expand All @@ -402,118 +371,49 @@ def create(cls, id_: str, root: str, metadata_db_path, template: Optional['Devic
return IosDeviceFilesystem.create(id_, root, metadata_db_path, template=template)

def is_subset_filesystem(self) -> bool:
return self._settings.is_subset_fs()
return self._real.is_subset_filesystem()

def scandir(self, path) -> list[DirEntry]:
return self._converter.scandir(path)
return self._real.scandir(path)

def get_dir_entry(self, path):
raise NotImplementedError()
return self._real.get_dir_entry(path)

def exists(self, path) -> bool:

real_path = self._converter.get_hashed_pathname(path)

# open the zipfile stored in `self.root` and find out if it
# contains the `real_path
with zipfile.ZipFile(self.root) as zp:
# get the main directory contained in the .zip container file
main_dir = zipsupport.get_zipfile_main_dir(zp)
return (main_dir / real_path).exists()
return self._real.exists(path)

def getsize(self, path) -> int:
with zipfile.ZipFile(self.root) as zp:
# get the main directory contained in the .zip container file
main_dir = zipsupport.get_zipfile_main_dir(zp)
return zipsupport.path_to_info(zp, main_dir / self._converter.get_hashed_pathname(path)).file_size
return self._real.getsize(path)

def ios_open_raw(self, path, mode):
# TODO: mode
tmp_copy = tempfile.NamedTemporaryFile(mode='w+b')
with zipfile.ZipFile(self.root) as zp:
# get the main directory contained in the .zip container file
main_dir = zipsupport.get_zipfile_main_dir(zp)
with (main_dir / path).open('rb') as zf:
tmp_copy.write(zf.read())
return tmp_copy
return self._real.ios_open_raw(path, mode)

def open(self, path):
return self.ios_open_raw(self._converter.get_hashed_pathname(path), 'rb')
return self._real.open(path)

def create_file(self, path):
raise NotImplementedError
return self._real.create_file(path)

def sqlite3_connect(self, path, read_only=True):
tmp_copy = tempfile.NamedTemporaryFile(mode='w+b')

with zipfile.ZipFile(self.root) as zp:
# get the main directory contained in the .zip container file
main_dir = zipsupport.get_zipfile_main_dir(zp)
with (main_dir / self._converter.get_hashed_pathname(path)).open('rb') as zf:
tmp_copy.write(zf.read())

log.debug(f"iOS connecting to {tmp_copy.name}")
return sqlite3_connect_with_regex_support(tmp_copy.name, read_only=read_only)
return self._real.sqlite3_connect(path, read_only=read_only)

def sqlite3_create(self, path):
raise NotImplementedError

def lock(self, locked: bool):
self._settings.set_locked(locked)

# update the settings file back in the zipped file
# for persistent settings preferenses
with zipfile.ZipFile(self.root, 'w') as zp:
# get the main directory contained in the .zip container file
main_dir = zipsupport.get_zipfile_main_dir(zp)
with (main_dir / '_rime_settings.db').open('wb') as zf, open(self.temp_settings_pathname, 'rb') as src_handle:
zf.write(src_handle.read())
return self._real.lock(locked)

def is_locked(self) -> bool:
return self._settings.is_locked()

def get_device_info(self) -> dict:
# Cache the device info
if self._device_info is None:
try:
with zipfile.ZipFile(self.root) as zp:
# get the main directory contained in the .zip container file
main_dir = zipsupport.get_zipfile_main_dir(zp)
# It should contain an Info.plist file
info_plist_file = main_dir / 'Info.plist'
if info_plist_file.exists():
log.debug(f"Reading {info_plist_file}")
with info_plist_file.open('rb') as f:
info_plist = plistlib.load(f)
self._device_info = {
k: info_plist.get(k, "") for k in [
'Device Name', 'Display Name',
'Product Name', 'Product Type', 'Product Version', 'Build Version',
'GUID', 'ICCID', 'IMEI', 'MEID', 'Phone Number', 'Serial Number',
'Target Identifier', 'Target Type', 'Unique Identifier'
]
}
else:
log.warning(f"Failed to read {info_plist_file}")
self._device_info = {}
except zipfile.BadZipFile:
log.warning(f"Failed to read Zipfile {self.root}")
self._device_info = {}
except plistlib.InvalidFileException:
log.info(f"Failed to read {info_plist_file} from within {self.root}")
self._device_info = {}
else:
self._device_info = {}
return self._device_info

return self._real.is_locked()

def dirname(self, path):
raise NotImplementedError(path)
return self._real.dirname(path)

def basename(self, path):
raise NotImplementedError(path)
return self._real.basename(path)

def stat(self, pathname):
raise NotImplementedError(pathname)
return self._real.stat(pathname)


class IosEncryptedDeviceFilesystem(DeviceFilesystem):
Expand Down
59 changes: 59 additions & 0 deletions rime/filesystem/zipsupport.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from contextlib import contextmanager
import os
import secrets
import shutil
import tempfile
import time
from zipfile import ZipFile, Path, ZipInfo

def get_zipfile_main_dir(zf: ZipFile) -> Path:
Expand Down Expand Up @@ -29,6 +32,40 @@ def path_to_info(zf: ZipFile, path: Path) -> ZipInfo:
"""
return zf.getinfo(str(path))

class ZippedFilesystem:
def __init__(self, zipped_pathname):
self.zipped_pathname = zipped_pathname
self.unzipped_dirname = self.zipped_pathname_to_unzipped_dirname(zipped_pathname)

if os.path.exists(self.unzipped_dirname):
if not self._is_unzipped():
shutil.rmtree(self.unzipped_dirname)

if not self._is_unzipped():
self._unzip()

def _is_unzipped(self):
return os.path.exists(os.path.join(self.unzipped_dirname, 'complete'))

def _unzip(self):
os.makedirs(self.unzipped_dirname, exist_ok=True)

with ZipFile(self.zipped_pathname) as zf:
zf.extractall(self.unzipped_dirname)

with open(os.path.join(self.unzipped_dirname, 'complete'), 'w') as f:
f.write(str(time.time()))

@classmethod
def zipped_pathname_to_unzipped_dirname(cls, zipped_pathname):
"""
Convert a zipped pathname to an unzipped directory name.
"""
dirname = os.path.dirname(zipped_pathname)
filename = os.path.splitext(os.path.basename(zipped_pathname))[0]

return os.path.join(dirname, f'_unzipped_{filename}')

def temp_file_name() -> str:
"""
Return a temporary file name. This is subject to race conditions but is the best we can do for Windows.
Expand All @@ -46,3 +83,25 @@ def temp_file_name() -> str:
raise FileExistsError("Could not create a temporary file.")

return pathname

class TmpFile:
"""
Produce a Windows-friendly temporary file which can be used as a context manager.
"""
def __init__(self):
self.pathname = temp_file_name()

def __del__(self):
if os.path.exists(self.pathname):
os.unlink(self.pathname)

def open(self, mode='w+b'):
return open(self.pathname, mode)

@contextmanager
def context_manager(self, fn, *args, **kw):
try:
yield fn(*args, **kw)
finally:
if os.path.exists(self.pathname):
os.unlink(self.pathname)

0 comments on commit 10a293e

Please sign in to comment.