Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support doing a full sync without deleting the existing index #81

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: add command line option to avoid deleting the index when doing …
…a refresh
  • Loading branch information
matinone committed Jan 30, 2024
commit d0ce6ced7aca57d4914d1bb0128cee85f9f15d10
4 changes: 4 additions & 0 deletions meilisync/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ def refresh(
size: int = typer.Option(
10000, "-s", "--size", help="Size of data for each insert to be inserted into MeiliSearch"
),
keep_index: bool = typer.Option(
False, "-d", "--keep-index", help="Flag to delete the existing index before doing the sync"
),
):
async def _():
settings = context.obj["settings"]
Expand All @@ -162,6 +165,7 @@ async def _():
count = await meili.refresh_data(
sync,
source.get_full_data(sync, size),
keep_index,
)
if count:
logger.info(
Expand Down
50 changes: 29 additions & 21 deletions meilisync/meili.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,27 @@ async def add_data(self, sync: Sync, data: list):
events = [Event(type=EventType.create, data=item) for item in data]
return await self.handle_events_by_type(sync, events, EventType.create)

async def refresh_data(self, sync: Sync, data: AsyncGenerator):
async def refresh_data(self, sync: Sync, data: AsyncGenerator, keep_index: bool = False):
index = sync.index_name
pk = sync.pk
sync.index = index_name_tmp = f"{index}_tmp"
try:
await self.client.index(index_name_tmp).delete()
except MeilisearchApiError as e:
if e.code != "MeilisearchApiError.index_not_found":
raise
settings = await self.client.index(index).get_settings()
index_tmp = await self.client.create_index(index_name_tmp, primary_key=pk)
task = await index_tmp.update_settings(settings)
logger.info(f"Waiting for update tmp index {index_name_tmp} settings to complete...")
await self.client.wait_for_task(
task_id=task.task_uid, timeout_in_ms=self.wait_for_task_timeout
)
if not keep_index:
sync.index = index_name_tmp = f"{index}_tmp"
try:
await self.client.index(index_name_tmp).delete()
except MeilisearchApiError as e:
if e.code != "MeilisearchApiError.index_not_found":
raise
settings = await self.client.index(index).get_settings()
index_tmp = await self.client.create_index(index_name_tmp, primary_key=pk)
task = await index_tmp.update_settings(settings)
logger.info(f"Waiting for update tmp index {index_name_tmp} settings to complete...")
await self.client.wait_for_task(
task_id=task.task_uid, timeout_in_ms=self.wait_for_task_timeout
)
else:
logger.info("Not deleting index when refreshing data")
index_name_tmp = index

tasks = []
count = 0
async for items in data:
Expand All @@ -61,13 +66,16 @@ async def refresh_data(self, sync: Sync, data: AsyncGenerator):
]
logger.info(f"Waiting for insert tmp index {index_name_tmp} to complete...")
await asyncio.gather(*wait_tasks)
task = await self.client.swap_indexes([(index, index_name_tmp)])
logger.info(f"Waiting for swap index {index} to complete...")
await self.client.wait_for_task(
task_id=task.task_uid, timeout_in_ms=self.wait_for_task_timeout
)
await self.client.index(index_name_tmp).delete()
logger.success(f"Swap index {index} complete")

if not keep_index:
task = await self.client.swap_indexes([(index, index_name_tmp)])
logger.info(f"Waiting for swap index {index} to complete...")
await self.client.wait_for_task(
task_id=task.task_uid, timeout_in_ms=self.wait_for_task_timeout
)
await self.client.index(index_name_tmp).delete()
logger.success(f"Swap index {index} complete")

return count

async def get_count(self, index: str):
Expand Down