Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(Chore): Refine Single Table Migration #384

Merged
merged 2 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions bin/migrate_to_single_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ def load_json(file_name):

def main():
args = parse_args()
migrator = IndexRecordMigrator(creds_file=args.creds_file)
migrator = IndexRecordMigrator(
creds_file=args.creds_file, batch_size=args.batch_size
)
migrator.index_record_to_new_table(
offset=args.start_offset, last_seen_guid=args.start_did
)
Expand All @@ -61,14 +63,22 @@ def parse_args():
parser.add_argument(
"--start-offset",
dest="start_offset",
type=int,
help="offset to start at",
default=None,
)
parser.add_argument(
"--batch-size",
dest="batch_size",
help="number of records to batch select from source table (default: 1000)",
type=int,
default=1000,
)
return parser.parse_args()


class IndexRecordMigrator:
def __init__(self, creds_file=None):
def __init__(self, creds_file=None, batch_size=None):
self.logger = get_logger("migrate_single_table", log_level="debug")

conf_data = load_json(creds_file) if creds_file else load_json("creds.json")
Expand All @@ -78,6 +88,7 @@ def __init__(self, creds_file=None):
psw = conf_data.get("db_password", "{{db_password}}")
pghost = conf_data.get("db_host", "{{db_host}}")
pgport = 5432
self.batch_size = batch_size

try:
engine = create_engine(
Expand All @@ -91,9 +102,7 @@ def __init__(self, creds_file=None):

self.session = Session()

def index_record_to_new_table(
self, batch_size=1000, offset=None, last_seen_guid=None
):
def index_record_to_new_table(self, offset=None, last_seen_guid=None):
"""
Collect records from index_record table, collect additional info from multiple tables and bulk insert to new record table.
"""
Expand All @@ -106,23 +115,24 @@ def index_record_to_new_table(
records = (
self.session.query(IndexRecord)
.order_by(IndexRecord.did)
.limit(batch_size)
.limit(self.batch_size)
.all()
)
elif offset is not None:
records = (
self.session.query(IndexRecord)
.order_by(IndexRecord.did)
.offset(offset - 1)
.limit(batch_size)
.limit(self.batch_size)
.all()
)
else:
self.logger.info(f"Start guid set to: {last_seen_guid}")
records = (
self.session.query(IndexRecord)
.order_by(IndexRecord.did)
.filter(IndexRecord.did > last_seen_guid)
.limit(batch_size)
.limit(self.batch_size)
.all()
)

Expand Down
6 changes: 4 additions & 2 deletions tests/test_migrate_to_single_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,12 @@ def test_index_record_to_new_table():
"""
Test index_record_to_new_table copies records from old tables to new record table.
"""
index_record_migrator = IndexRecordMigrator(creds_file="tests/test_creds.json")
index_record_migrator = IndexRecordMigrator(
creds_file="tests/test_creds.json", batch_size=10
)
n_records = 100
create_record(n_records)
index_record_migrator.index_record_to_new_table(batch_size=10)
index_record_migrator.index_record_to_new_table()

engine = create_engine(POSTGRES_CONNECTION)
with engine.connect() as conn:
Expand Down
Loading