Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tqdm to pandas_to_eland() #299

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 48 additions & 15 deletions eland/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import pandas as pd # type: ignore
from elasticsearch import Elasticsearch
from elasticsearch.helpers import parallel_bulk
from tqdm.notebook import tqdm # type: ignore

from eland import DataFrame
from eland.common import DEFAULT_CHUNK_SIZE, PANDAS_VERSION, ensure_es_client
Expand All @@ -46,6 +47,7 @@ def pandas_to_eland(
thread_count: int = 4,
chunksize: Optional[int] = None,
use_pandas_index_for_es_ids: bool = True,
show_progressbar: Optional[bool] = None,
) -> DataFrame:
"""
Append a pandas DataFrame to an Elasticsearch index.
Expand Down Expand Up @@ -79,6 +81,10 @@ def pandas_to_eland(
use_pandas_index_for_es_ids: bool, default 'True'
* True: pandas.DataFrame.index fields will be used to populate Elasticsearch '_id' fields.
* False: Ignore pandas.DataFrame.index when indexing into Elasticsearch
show_progressbar: Optional[bool], default 'None'
* True : show a progress bar only if we detect Jupyter Notebook (for now)
* False : don't show a progress bar
* None : show a progress bar only if we detect Jupyter Notebook

Returns
-------
Expand Down Expand Up @@ -184,35 +190,62 @@ def pandas_to_eland(
else:
es_client.indices.create(index=es_dest_index, body=mapping)

if show_progressbar is None or show_progressbar is True:
# Detect jupyter notebook
try:
from IPython import get_ipython # type: ignore

ip = get_ipython()
if hasattr(ip, "kernel"):
show_progressbar = True
except ImportError:
show_progressbar = False

def action_generator(
pd_df: pd.DataFrame,
es_dropna: bool,
use_pandas_index_for_es_ids: bool,
es_dest_index: str,
show_progressbar: Optional[bool],
) -> Generator[Dict[str, Any], None, None]:
for row in pd_df.iterrows():
if es_dropna:
values = row[1].dropna().to_dict()
else:
values = row[1].to_dict()

if use_pandas_index_for_es_ids:
# Use index as _id
id = row[0]

action = {"_index": es_dest_index, "_source": values, "_id": str(id)}
else:
action = {"_index": es_dest_index, "_source": values}

yield action
with tqdm(
total=pd_df.shape[0],
disable=not show_progressbar,
desc="Progress",
) as progress_bar:
for row in pd_df.iterrows():
if es_dropna:
values = row[1].dropna().to_dict()
else:
values = row[1].to_dict()

if use_pandas_index_for_es_ids:
# Use index as _id
id = row[0]

action = {
"_index": es_dest_index,
"_source": values,
"_id": str(id),
}
else:
action = {"_index": es_dest_index, "_source": values}

progress_bar.update(1)
yield action

# parallel_bulk is lazy generator so use deque to consume them immediately
# maxlen = 0 because don't need results of parallel_bulk
deque(
parallel_bulk(
client=es_client,
actions=action_generator(
pd_df, es_dropna, use_pandas_index_for_es_ids, es_dest_index
pd_df=pd_df,
es_dropna=es_dropna,
use_pandas_index_for_es_ids=use_pandas_index_for_es_ids,
es_dest_index=es_dest_index,
show_progressbar=show_progressbar,
),
thread_count=thread_count,
chunk_size=int(chunksize / thread_count),
Expand Down
4 changes: 4 additions & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,7 @@ nox
lightgbm
pytest-cov
mypy
tqdm
jupyter
notebook
ipywidgets
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
elasticsearch>=7.7
pandas>=1
matplotlib
tqdm
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
"pandas>=1.2,<1.4",
"matplotlib",
"numpy",
"tqdm",
],
python_requires=">=3.7",
package_data={"eland": ["py.typed"]},
Expand Down
4 changes: 3 additions & 1 deletion tests/etl/test_pandas_to_eland.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,16 @@ def test_es_if_exists_fail(self):
"to 'append' or 'replace' data."
)

def test_es_if_exists_replace(self):
@pytest.mark.parametrize("show_progressbar", [True, False, None])
def test_es_if_exists_replace(self, show_progressbar):
# Assert that 'replace' allows for creation
df1 = pandas_to_eland(
pd_df2,
es_client=ES_TEST_CLIENT,
es_dest_index="test-index",
es_if_exists="replace",
es_refresh=True,
show_progressbar=show_progressbar,
).to_pandas()
assert_frame_equal(pd_df2, df1)

Expand Down
Loading