Skip to content

Commit

Permalink
fix iterating through resource table with seek method by recreating t…
Browse files Browse the repository at this point in the history
…he table with rowid sorted
  • Loading branch information
Bodong-Yang committed Dec 11, 2024
1 parent 7d75521 commit f68b250
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 36 deletions.
41 changes: 7 additions & 34 deletions src/ota_metadata/legacy/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
from ota_metadata.file_table._table import FileTableRegularFiles
from ota_metadata.utils import DownloadInfo
from ota_metadata.utils.cert_store import CAChainStore
from ota_metadata.utils.sqlite3_helper import sort_and_place
from otaclient_common.common import urljoin_ensure_base
from otaclient_common.typing import StrOrPath

Expand All @@ -83,39 +84,6 @@
DB_TIMEOUT = 16 # seconds


def _sort_ft_regular_in_place(_orm: FileTableRegularFilesORM) -> None:
"""Sort the ft_regular table by digest, and then replace the old table
with the sorted one.
This is required for later otaclient applies update to standby slot.
"""
SORTED_TABLE_NAME = "ft_regular_sorted"
ORIGINAL_TABLE_NAME = FileTableRegularFiles.table_name

_table_create_stmt = _orm.orm_table_spec.table_create_stmt(SORTED_TABLE_NAME)
_dump_sorted = (
f"INSERT INTO {SORTED_TABLE_NAME} SELECT * FROM "
f"{ORIGINAL_TABLE_NAME} ORDER BY digest;"
)

conn = _orm.orm_con
with conn as conn:
conn.executescript(
"\n".join(
[
"BEGIN;",
_table_create_stmt,
_dump_sorted,
f"DROP TABLE {ORIGINAL_TABLE_NAME};",
f"ALTER TABLE {SORTED_TABLE_NAME} RENAME TO {ORIGINAL_TABLE_NAME};",
]
)
)

with conn as conn:
conn.execute("VACUUM;")


class OTAMetadata:
"""
workdir layout:
Expand Down Expand Up @@ -274,7 +242,12 @@ def download_metafiles(
_orm=_ft_regular_orm,
_orm_rs=_rs_orm,
)
_sort_ft_regular_in_place(_ft_regular_orm)
# NOTE: also check file_table definition at ota_metadata.file_table._table
sort_and_place(
_ft_regular_orm,
FileTableRegularFiles.table_name,
order_by_col="digest",
)
except Exception as e:
_err_msg = f"failed to parse CSV metafiles: {e!r}"
logger.error(_err_msg)
Expand Down
31 changes: 31 additions & 0 deletions src/ota_metadata/utils/sqlite3_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def iter_all(
"""Iter all entries with seek method by rowid.
NOTE: the target table must has rowid defined!
NOTE: the rowid MUST BE continues without any holes!
"""
_pagination_stmt = self.orm_table_spec.table_select_stmt(
select_from=self.orm_table_name,
Expand All @@ -55,6 +56,7 @@ def iter_all_with_shuffle(
"""Iter all entries with seek method by rowid, shuffle each batch before yield.
NOTE: the target table must has rowid defined!
NOTE: the rowid MUST BE continues without any holes!
"""
_pagination_stmt = self.orm_table_spec.table_select_stmt(
select_from=self.orm_table_name,
Expand All @@ -75,3 +77,32 @@ def iter_all_with_shuffle(

random.shuffle(_batch)
yield from _batch


def sort_and_place(_orm: ORMBase, table_name: str, *, order_by_col: str) -> None:
"""Sort the table, and then replace the old table with the sorted one."""
ORIGINAL_TABLE_NAME = table_name
SORTED_TABLE_NAME = f"{table_name}_sorted"
_table_spec = _orm.orm_table_spec

_table_create_stmt = _table_spec.table_create_stmt(SORTED_TABLE_NAME)
_dump_sorted = (
f"INSERT INTO {SORTED_TABLE_NAME} SELECT * FROM "
f"{ORIGINAL_TABLE_NAME} ORDER BY {order_by_col};"
)

conn = _orm.orm_con
with conn as conn:
conn.executescript(
"\n".join(
[
"BEGIN;",
_table_create_stmt,
_dump_sorted,
f"DROP TABLE {ORIGINAL_TABLE_NAME};",
f"ALTER TABLE {SORTED_TABLE_NAME} RENAME TO {ORIGINAL_TABLE_NAME};",
]
)
)
with conn as conn:
conn.execute("VACUUM;")
11 changes: 9 additions & 2 deletions src/otaclient/create_standby/_delta_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
)
from ota_metadata.legacy.metadata import OTAMetadata
from ota_metadata.legacy.rs_table import (
ResourceTable,
RSTableORMThreadPool,
)
from ota_metadata.utils.sqlite3_helper import sort_and_place
from otaclient._status_monitor import StatusReport, UpdateProgressReport
from otaclient.configs.cfg import cfg
from otaclient_common.common import create_tmp_fname
Expand Down Expand Up @@ -258,8 +260,13 @@ def calculate_delta(self) -> None:
thread_local=thread_local,
).add_done_callback(self._task_done_callback)

# NOTE: fill up the holes created by DELETE.
self._rstable_orm.orm_execute("VACUUM;")
# NOTE: fill up the holes created by DELETE, and make
# the rowid continues again.
sort_and_place(
self._rstable_orm,
ResourceTable.table_name,
order_by_col="rowid",
)
finally:
pool.shutdown(wait=True)
self._ft_regular_orm._con.close()
Expand Down

0 comments on commit f68b250

Please sign in to comment.