diff --git a/eland/etl.py b/eland/etl.py index 3b32872d..a13c32ba 100644 --- a/eland/etl.py +++ b/eland/etl.py @@ -22,6 +22,7 @@ import pandas as pd # type: ignore from elasticsearch import Elasticsearch from elasticsearch.helpers import parallel_bulk +from tqdm.notebook import tqdm # type: ignore from eland import DataFrame from eland.common import DEFAULT_CHUNK_SIZE, PANDAS_VERSION, ensure_es_client @@ -46,6 +47,7 @@ def pandas_to_eland( thread_count: int = 4, chunksize: Optional[int] = None, use_pandas_index_for_es_ids: bool = True, + show_progressbar: Optional[bool] = None, ) -> DataFrame: """ Append a pandas DataFrame to an Elasticsearch index. @@ -79,6 +81,10 @@ def pandas_to_eland( use_pandas_index_for_es_ids: bool, default 'True' * True: pandas.DataFrame.index fields will be used to populate Elasticsearch '_id' fields. * False: Ignore pandas.DataFrame.index when indexing into Elasticsearch + show_progressbar: Optional[bool], default 'None' + * True : show a progress bar only if we detect Jupyter Notebook (for now) + * False : don't show a progress bar + * None : show a progress bar only if we detect Jupyter Notebook Returns ------- @@ -184,27 +190,50 @@ def pandas_to_eland( else: es_client.indices.create(index=es_dest_index, body=mapping) + if show_progressbar is None or show_progressbar is True: + # Detect jupyter notebook + try: + from IPython import get_ipython # type: ignore + + ip = get_ipython() + if hasattr(ip, "kernel"): + show_progressbar = True + except ImportError: + show_progressbar = False + def action_generator( pd_df: pd.DataFrame, es_dropna: bool, use_pandas_index_for_es_ids: bool, es_dest_index: str, + show_progressbar: Optional[bool], ) -> Generator[Dict[str, Any], None, None]: - for row in pd_df.iterrows(): - if es_dropna: - values = row[1].dropna().to_dict() - else: - values = row[1].to_dict() - - if use_pandas_index_for_es_ids: - # Use index as _id - id = row[0] - - action = {"_index": es_dest_index, "_source": values, "_id": str(id)} - else: - action = {"_index": es_dest_index, "_source": values} - yield action + with tqdm( + total=pd_df.shape[0], + disable=not show_progressbar, + desc="Progress", + ) as progress_bar: + for row in pd_df.iterrows(): + if es_dropna: + values = row[1].dropna().to_dict() + else: + values = row[1].to_dict() + + if use_pandas_index_for_es_ids: + # Use index as _id + id = row[0] + + action = { + "_index": es_dest_index, + "_source": values, + "_id": str(id), + } + else: + action = {"_index": es_dest_index, "_source": values} + + progress_bar.update(1) + yield action # parallel_bulk is lazy generator so use deque to consume them immediately # maxlen = 0 because don't need results of parallel_bulk @@ -212,7 +241,11 @@ def action_generator( parallel_bulk( client=es_client, actions=action_generator( - pd_df, es_dropna, use_pandas_index_for_es_ids, es_dest_index + pd_df=pd_df, + es_dropna=es_dropna, + use_pandas_index_for_es_ids=use_pandas_index_for_es_ids, + es_dest_index=es_dest_index, + show_progressbar=show_progressbar, ), thread_count=thread_count, chunk_size=int(chunksize / thread_count), diff --git a/requirements-dev.txt b/requirements-dev.txt index 9310580a..0119ffa2 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -11,3 +11,7 @@ nox lightgbm pytest-cov mypy +tqdm +jupyter +notebook +ipywidgets diff --git a/requirements.txt b/requirements.txt index 3b229a4e..cb69e57c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ elasticsearch>=7.7 pandas>=1 matplotlib +tqdm \ No newline at end of file diff --git a/setup.py b/setup.py index 279b973d..4952ab34 100644 --- a/setup.py +++ b/setup.py @@ -75,6 +75,7 @@ "pandas>=1.2,<1.4", "matplotlib", "numpy", + "tqdm", ], python_requires=">=3.7", package_data={"eland": ["py.typed"]}, diff --git a/tests/etl/test_pandas_to_eland.py b/tests/etl/test_pandas_to_eland.py index c99f57c3..954e469c 100644 --- a/tests/etl/test_pandas_to_eland.py +++ b/tests/etl/test_pandas_to_eland.py @@ -70,7 +70,8 @@ def test_es_if_exists_fail(self): "to 'append' or 'replace' data." ) - def test_es_if_exists_replace(self): + @pytest.mark.parametrize("show_progressbar", [True, False, None]) + def test_es_if_exists_replace(self, show_progressbar): # Assert that 'replace' allows for creation df1 = pandas_to_eland( pd_df2, @@ -78,6 +79,7 @@ def test_es_if_exists_replace(self): es_dest_index="test-index", es_if_exists="replace", es_refresh=True, + show_progressbar=show_progressbar, ).to_pandas() assert_frame_equal(pd_df2, df1) diff --git a/tests/notebook/test_etl.ipynb b/tests/notebook/test_etl.ipynb index 5f16c202..83d26d3c 100644 --- a/tests/notebook/test_etl.ipynb +++ b/tests/notebook/test_etl.ipynb @@ -40,8 +40,8 @@ "name": "stdout", "output_type": "stream", "text": [ - "2021-03-30 11:57:39.116425: read 10000 rows\n", - "2021-03-30 11:57:39.522722: read 13059 rows\n" + "2020-11-17 00:02:17.869106: read 10000 rows\n", + "2020-11-17 00:02:18.873135: read 13059 rows\n" ] } ], @@ -75,8 +75,146 @@ "outputs": [ { "data": { - "text/plain": " account length area code churn customer service calls \\\n0 128 415 0 1 \n1 107 415 0 1 \n\n international plan number vmail messages phone number state \\\n0 no 25 382-4657 KS \n1 no 26 371-7191 OH \n\n total day calls total day charge ... total eve calls total eve charge \\\n0 110 45.07 ... 99 16.78 \n1 123 27.47 ... 103 16.62 \n\n total eve minutes total intl calls total intl charge total intl minutes \\\n0 197.4 3 2.7 10.0 \n1 195.5 3 3.7 13.7 \n\n total night calls total night charge total night minutes voice mail plan \n0 91 11.01 244.7 yes \n1 103 11.45 254.4 yes \n\n[2 rows x 21 columns]", - "text/html": "
\n\n\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n
account lengtharea codechurncustomer service callsinternational plannumber vmail messagesphone numberstatetotal day callstotal day charge...total eve callstotal eve chargetotal eve minutestotal intl callstotal intl chargetotal intl minutestotal night callstotal night chargetotal night minutesvoice mail plan
012841501no25382-4657KS11045.07...9916.78197.432.710.09111.01244.7yes
110741501no26371-7191OH12327.47...10316.62195.533.713.710311.45254.4yes
\n
\n

2 rows × 21 columns

" + "application/vnd.jupyter.widget-view+json": { + "model_id": "a1871aae58ca4aada70ab4df4e1b9673", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "HBox(children=(HTML(value='Progress'), FloatProgress(value=0.0, max=2.0), HTML(value='')))" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n" + ] + }, + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
account lengtharea codechurncustomer service callsinternational plannumber vmail messagesphone numberstatetotal day callstotal day charge...total eve callstotal eve chargetotal eve minutestotal intl callstotal intl chargetotal intl minutestotal night callstotal night chargetotal night minutesvoice mail plan
012841501no25382-4657KS11045.07...9916.78197.432.710.09111.01244.7yes
110741501no26371-7191OH12327.47...10316.62195.533.713.710311.45254.4yes
\n", + "
\n", + "

2 rows × 21 columns

" + ], + "text/plain": [ + " account length area code churn customer service calls \\\n", + "0 128 415 0 1 \n", + "1 107 415 0 1 \n", + "\n", + " international plan number vmail messages phone number state \\\n", + "0 no 25 382-4657 KS \n", + "1 no 26 371-7191 OH \n", + "\n", + " total day calls total day charge ... total eve calls total eve charge \\\n", + "0 110 45.07 ... 99 16.78 \n", + "1 123 27.47 ... 103 16.62 \n", + "\n", + " total eve minutes total intl calls total intl charge total intl minutes \\\n", + "0 197.4 3 2.7 10.0 \n", + "1 195.5 3 3.7 13.7 \n", + "\n", + " total night calls total night charge total night minutes voice mail plan \n", + "0 91 11.01 244.7 yes \n", + "1 103 11.45 254.4 yes \n", + "\n", + "[2 rows x 21 columns]" + ] }, "execution_count": 5, "metadata": {}, @@ -95,7 +233,38 @@ "outputs": [ { "data": { - "text/plain": "{'took': 0,\n 'timed_out': False,\n '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0},\n 'hits': {'total': {'value': 2, 'relation': 'eq'},\n 'max_score': 1.0,\n 'hits': [{'_index': 'churn',\n '_id': '0',\n '_score': 1.0,\n '_source': {'state': 'KS',\n 'account length': 128,\n 'area code': 415,\n 'phone number': '382-4657',\n 'international plan': 'no',\n 'voice mail plan': 'yes',\n 'number vmail messages': 25,\n 'total day minutes': 265.1,\n 'total day calls': 110,\n 'total day charge': 45.07,\n 'total eve minutes': 197.4,\n 'total eve calls': 99,\n 'total eve charge': 16.78,\n 'total night minutes': 244.7,\n 'total night calls': 91,\n 'total night charge': 11.01,\n 'total intl minutes': 10.0,\n 'total intl calls': 3,\n 'total intl charge': 2.7,\n 'customer service calls': 1,\n 'churn': 0}}]}}" + "text/plain": [ + "{'took': 4,\n", + " 'timed_out': False,\n", + " '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0},\n", + " 'hits': {'total': {'value': 2, 'relation': 'eq'},\n", + " 'max_score': 1.0,\n", + " 'hits': [{'_index': 'churn',\n", + " '_type': '_doc',\n", + " '_id': '0',\n", + " '_score': 1.0,\n", + " '_source': {'state': 'KS',\n", + " 'account length': 128,\n", + " 'area code': 415,\n", + " 'phone number': '382-4657',\n", + " 'international plan': 'no',\n", + " 'voice mail plan': 'yes',\n", + " 'number vmail messages': 25,\n", + " 'total day minutes': 265.1,\n", + " 'total day calls': 110,\n", + " 'total day charge': 45.07,\n", + " 'total eve minutes': 197.4,\n", + " 'total eve calls': 99,\n", + " 'total eve charge': 16.78,\n", + " 'total night minutes': 244.7,\n", + " 'total night calls': 91,\n", + " 'total night charge': 11.01,\n", + " 'total intl minutes': 10.0,\n", + " 'total intl calls': 3,\n", + " 'total intl charge': 2.7,\n", + " 'customer service calls': 1,\n", + " 'churn': 0}}]}}" + ] }, "execution_count": 6, "metadata": {}, @@ -124,6 +293,147 @@ "source": [ "es.indices.delete(index='churn', ignore=[400, 404])" ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "b70a238f828c41cd9ebbb1e74164b057", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "HBox(children=(HTML(value='Progress'), FloatProgress(value=0.0, max=13059.0), HTML(value='')))" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n" + ] + } + ], + "source": [ + "# NBVAL_IGNORE_OUTPUT\n", + "ed_df = ed.pandas_to_eland(pd_df, es_client='localhost', es_dest_index=\"pandas_flights\", show_progressbar=True)" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'acknowledged': True}" + ] + }, + "execution_count": 9, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "es.indices.delete(index='pandas_flights', ignore=[400, 404])" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "# NBVAL_IGNORE_OUTPUT\n", + "ed_df = ed.pandas_to_eland(pd_df, es_client='localhost', es_dest_index=\"pandas_flights\", show_progressbar=False)" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'acknowledged': True}" + ] + }, + "execution_count": 11, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "es.indices.delete(index='pandas_flights', ignore=[400, 404])" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "877234408b3d40f7af37b0bd9dec9c05", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "HBox(children=(HTML(value='Progress'), FloatProgress(value=0.0, max=13059.0), HTML(value='')))" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n" + ] + } + ], + "source": [ + "# NBVAL_IGNORE_OUTPUT\n", + "ed_df = ed.pandas_to_eland(pd_df, es_client='localhost', es_dest_index=\"pandas_flights\", show_progressbar=None)" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'acknowledged': True}" + ] + }, + "execution_count": 13, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "es.indices.delete(index='pandas_flights', ignore=[400, 404])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] } ], "metadata": { @@ -147,4 +457,4 @@ }, "nbformat": 4, "nbformat_minor": 4 -} \ No newline at end of file +}