From 55c0de2df00600b1658522a55a41f05bdd280e61 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Fri, 14 Mar 2025 14:18:45 +0100 Subject: [PATCH 01/15] fix: improves merge of flight_client_options, introduces method to prepare query ticket and options --- influxdb_client_3/query/query_api.py | 77 ++++++++++++++++--------- tests/test_query.py | 86 ++++++++++++++++++++++++++++ 2 files changed, 136 insertions(+), 27 deletions(-) diff --git a/influxdb_client_3/query/query_api.py b/influxdb_client_3/query/query_api.py index ee0725f..6f9b6cc 100644 --- a/influxdb_client_3/query/query_api.py +++ b/influxdb_client_3/query/query_api.py @@ -1,9 +1,11 @@ """Query data in InfluxDB 3.""" - +import asyncio +import concurrent.futures # coding: utf-8 import json from pyarrow.flight import FlightClient, Ticket, FlightCallOptions, FlightStreamReader + from influxdb_client_3.version import USER_AGENT @@ -121,19 +123,28 @@ def __init__(self, """ self._token = token self._flight_client_options = flight_client_options or {} + default_user_agent = ("grpc.secondary_user_agent", USER_AGENT) + if "generic_options" in self._flight_client_options: + if "grpc.secondary_user_agent" not in dict(self._flight_client_options["generic_options"]).keys(): + self._flight_client_options["generic_options"].append(default_user_agent) + else: + self._flight_client_options["generic_options"] = [default_user_agent] self._proxy = proxy + from influxdb_client_3 import _merge_options as merge_options if options: if options.flight_client_options: - self._flight_client_options = options.flight_client_options + self._flight_client_options = merge_options(self._flight_client_options, + None, + options.flight_client_options) + if ('generic_options' in options.flight_client_options and + 'grpc.secondary_user_agent' in dict(options.flight_client_options["generic_options"]).keys()): + self._flight_client_options['generic_options'].remove(default_user_agent) if options.tls_root_certs: self._flight_client_options["tls_root_certs"] = options.tls_root_certs if options.proxy: self._proxy = options.proxy if options.tls_verify is not None: self._flight_client_options["disable_server_verification"] = not options.tls_verify - self._flight_client_options["generic_options"] = [ - ("grpc.secondary_user_agent", USER_AGENT) - ] if self._proxy: self._flight_client_options["generic_options"].append(("grpc.http_proxy", self._proxy)) self._flight_client = FlightClient(connection_string, **self._flight_client_options) @@ -152,30 +163,10 @@ def query(self, query: str, language: str, mode: str, database: str, **kwargs): It should be a ``dictionary`` of key-value pairs. :return: The query result in the specified mode. """ - from influxdb_client_3 import polars as has_polars, _merge_options as merge_options + from influxdb_client_3 import polars as has_polars try: - # Create an authorization header - optargs = { - "headers": [(b"authorization", f"Bearer {self._token}".encode('utf-8'))], - "timeout": 300 - } - opts = merge_options(optargs, exclude_keys=['query_parameters'], custom=kwargs) - _options = FlightCallOptions(**opts) - - # - # Ticket data - # - ticket_data = { - "database": database, - "sql_query": query, - "query_type": language - } - # add query parameters - query_parameters = kwargs.get("query_parameters", None) - if query_parameters: - ticket_data["params"] = query_parameters + ticket, _options = self._prepare_query(query, language, database, **kwargs) - ticket = Ticket(json.dumps(ticket_data).encode('utf-8')) flight_reader = self._do_get(ticket, _options) mode_funcs = { @@ -194,9 +185,41 @@ def query(self, query: str, language: str, mode: str, database: str, **kwargs): except Exception as e: raise e + def _prepare_query(self, query: str, language: str, database: str, **kwargs): + from influxdb_client_3 import _merge_options as merge_options + # Create an authorization header + optargs = { + "headers": [(b"authorization", f"Bearer {self._token}".encode('utf-8'))], + "timeout": 300 + } + opts = merge_options(optargs, exclude_keys=['query_parameters'], custom=kwargs) + _options = FlightCallOptions(**opts) + + # + # Ticket data + # + ticket_data = { + "database": database, + "sql_query": query, + "query_type": language + } + # add query parameters + query_parameters = kwargs.get("query_parameters", None) + if query_parameters: + ticket_data["params"] = query_parameters + + ticket = Ticket(json.dumps(ticket_data).encode('utf-8')) + + return ticket, _options + def _do_get(self, ticket: Ticket, options: FlightCallOptions = None) -> FlightStreamReader: return self._flight_client.do_get(ticket, options) + async def _do_get_async(self, ticket: Ticket, options: FlightCallOptions = None): + loop = asyncio.get_running_loop() + return loop.run_in_executor(concurrent.futures.ThreadPoolExecutor, + self._flight_client.do_get, ticket, options) + def close(self): """Close the Flight client.""" self._flight_client.close() diff --git a/tests/test_query.py b/tests/test_query.py index d529ffb..6874d11 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -1,6 +1,7 @@ import unittest import struct import os +import json from unittest.mock import Mock, ANY from pyarrow import ( @@ -9,6 +10,7 @@ ) from pyarrow.flight import ( + FlightClient, FlightServerBase, FlightUnauthenticatedError, GeneratorStream, @@ -282,3 +284,87 @@ def test_client_with_ssl_args(self): assert dict(fc_opts['generic_options'])['grpc.http_proxy'] == proxy finally: self.remove_cert_file(cert_name) + + def test_multiple_flight_client_options(self): + + q_opts = QueryApiOptionsBuilder().flight_client_options({ + 'generic_options': [('optA', 'A in options')] + }).build() + + q_api = QueryApi( + connection_string="grpc+tls://my-server.org", + token='my_token', + flight_client_options={"generic_options": [('opt1', 'opt1 in args')]}, + proxy=None, + options=q_opts + ) + + assert ('opt1', 'opt1 in args') in q_api._flight_client_options['generic_options'] + assert ('optA', 'A in options') in q_api._flight_client_options['generic_options'] + assert (('grpc.secondary_user_agent', 'influxdb3-python/0.12.0dev0') in + q_api._flight_client_options['generic_options']) + + def test_override_secondary_user_agent_args(self): + q_api = QueryApi( + connection_string="grpc+tls://my-server.org", + token='my_token', + flight_client_options={"generic_options": [('grpc.secondary_user_agent', 'my_custom_user_agent')]}, + proxy=None, + options=None + ) + + assert ('grpc.secondary_user_agent', 'my_custom_user_agent') in q_api._flight_client_options['generic_options'] + assert not (('grpc.secondary_user_agent', 'influxdb3-python/0.12.0dev0') in + q_api._flight_client_options['generic_options']) + + def test_secondary_user_agent_in_options(self): + q_opts = QueryApiOptionsBuilder().flight_client_options({ + 'generic_options': [ + ('optA', 'A in options'), + ('grpc.secondary_user_agent', 'my_custom_user_agent') + ] + }).build() + + q_api = QueryApi( + connection_string="grpc+tls://my-server.org", + token='my_token', + flight_client_options=None, + proxy=None, + options=q_opts + ) + + assert ('optA', 'A in options') in q_api._flight_client_options['generic_options'] + assert ('grpc.secondary_user_agent', 'my_custom_user_agent') in q_api._flight_client_options['generic_options'] + assert (('grpc.secondary_user_agent', 'influxdb3-python/0.12.0dev0') not in + q_api._flight_client_options['generic_options']) + + def test_prepare_query(self): + global _req_headers + token = 'my_token' + q_api = QueryApi( + connection_string="grpc+tls://my-server.org", + token=token, + flight_client_options={"generic_options": [('Foo', 'Bar')]}, + proxy=None, + options=None + ) + + query = "SELECT * FROM sensors" + language = "sql" + database = "my_database" + + ticket, options = q_api._prepare_query(query=query, + language=language, + database=database) + tkt = json.loads(ticket.ticket.decode('utf-8')) + assert tkt['database'] == database + assert tkt['sql_query'] == query + assert tkt['query_type'] == language + + with HeaderCheckFlightServer( + auth_handler=NoopAuthHandler(), + middleware={"check": HeaderCheckServerMiddlewareFactory()}) as server: + with FlightClient(('localhost', server.port)) as client: + client.do_get(ticket, options) + assert _req_headers['authorization'] == [f"Bearer {token}"] + _req_headers = {} From 8e46dd5e2fed7e7c8b55f77da6fcc55635fd8359 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Fri, 14 Mar 2025 16:54:08 +0100 Subject: [PATCH 02/15] feat: (WIP) start query_async() in query_api --- influxdb_client_3/query/query_api.py | 39 +++++++++++++++++++--------- 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/influxdb_client_3/query/query_api.py b/influxdb_client_3/query/query_api.py index 6f9b6cc..4525891 100644 --- a/influxdb_client_3/query/query_api.py +++ b/influxdb_client_3/query/query_api.py @@ -169,17 +169,37 @@ def query(self, query: str, language: str, mode: str, database: str, **kwargs): flight_reader = self._do_get(ticket, _options) + return self._translate_stream_reader(flight_reader, mode) + except Exception as e: + raise e + + async def query_async(self, query: str, language: str, mode: str, database: str, **kwargs): + try: + ticket, options = self._prepare_query(query, language, database, **kwargs) + loop = asyncio.get_running_loop() + _flight_reader = await loop.run_in_executor(None, + self._flight_client.do_get, ticket, options) + return await loop.run_in_executor(None, self._translate_stream_reader, + _flight_reader, + mode) + except Exception as e: + print(f"\DEBUG caught exception e {e}") + raise e + + def _translate_stream_reader(self, reader: FlightStreamReader, mode: str): + from influxdb_client_3 import polars as has_polars + try: mode_funcs = { - "all": flight_reader.read_all, - "pandas": flight_reader.read_pandas, - "chunk": lambda: flight_reader, - "reader": flight_reader.to_reader, - "schema": lambda: flight_reader.schema + "all": reader.read_all, + "pandas": reader.read_pandas, + "chunk": lambda: reader, + "reader": reader.to_reader, + "schema": lambda: reader.schema } if has_polars: import polars as pl - mode_funcs["polars"] = lambda: pl.from_arrow(flight_reader.read_all()) - mode_func = mode_funcs.get(mode, flight_reader.read_all) + mode_funcs["polars"] = lambda: pl.from_arrow(reader.read_all()) + mode_func = mode_funcs.get(mode, reader.read_all) return mode_func() if callable(mode_func) else mode_func except Exception as e: @@ -215,11 +235,6 @@ def _prepare_query(self, query: str, language: str, database: str, **kwargs): def _do_get(self, ticket: Ticket, options: FlightCallOptions = None) -> FlightStreamReader: return self._flight_client.do_get(ticket, options) - async def _do_get_async(self, ticket: Ticket, options: FlightCallOptions = None): - loop = asyncio.get_running_loop() - return loop.run_in_executor(concurrent.futures.ThreadPoolExecutor, - self._flight_client.do_get, ticket, options) - def close(self): """Close the Flight client.""" self._flight_client.close() From 91004b14bf7226efb976f50960c10bfa777a9802 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Fri, 14 Mar 2025 16:57:28 +0100 Subject: [PATCH 03/15] chore: clean up flake from last commit --- influxdb_client_3/query/query_api.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/influxdb_client_3/query/query_api.py b/influxdb_client_3/query/query_api.py index 4525891..fd05cf9 100644 --- a/influxdb_client_3/query/query_api.py +++ b/influxdb_client_3/query/query_api.py @@ -1,6 +1,5 @@ """Query data in InfluxDB 3.""" import asyncio -import concurrent.futures # coding: utf-8 import json @@ -163,7 +162,6 @@ def query(self, query: str, language: str, mode: str, database: str, **kwargs): It should be a ``dictionary`` of key-value pairs. :return: The query result in the specified mode. """ - from influxdb_client_3 import polars as has_polars try: ticket, _options = self._prepare_query(query, language, database, **kwargs) @@ -178,12 +176,11 @@ async def query_async(self, query: str, language: str, mode: str, database: str, ticket, options = self._prepare_query(query, language, database, **kwargs) loop = asyncio.get_running_loop() _flight_reader = await loop.run_in_executor(None, - self._flight_client.do_get, ticket, options) + self._flight_client.do_get, ticket, options) return await loop.run_in_executor(None, self._translate_stream_reader, _flight_reader, mode) except Exception as e: - print(f"\DEBUG caught exception e {e}") raise e def _translate_stream_reader(self, reader: FlightStreamReader, mode: str): From 4d17837b9790709fdc21f77872a20b4ba4999463 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Mon, 17 Mar 2025 17:08:55 +0100 Subject: [PATCH 04/15] feat: (WIP) adds client.query_async() plus first tests --- influxdb_client_3/__init__.py | 16 +++++ tests/test_query.py | 115 ++++++++++++++++++++++++++++++++++ 2 files changed, 131 insertions(+) diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index 6f736d2..de26941 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -278,6 +278,22 @@ def query(self, query: str, language: str = "sql", mode: str = "all", database: except InfluxDBError as e: raise e + async def query_async(self, query: str, language: str = "sql", mode: str = "all", database: str = None, **kwargs): + if mode == "polars" and polars is False: + raise ImportError("Polars is not installed. Please install it with `pip install polars`.") + + if database is None: + database = self._database + + try: + return await self._query_api.query_async(query=query, + language=language, + mode=mode, + database=database, + **kwargs) + except InfluxDBError as e: + raise e + def close(self): """Close the client and clean up resources.""" self._write_api.close() diff --git a/tests/test_query.py b/tests/test_query.py index 6874d11..3891073 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -1,3 +1,5 @@ +import asyncio +import inspect import unittest import struct import os @@ -6,6 +8,7 @@ from pyarrow import ( array, + concat_tables, Table ) @@ -14,6 +17,7 @@ FlightServerBase, FlightUnauthenticatedError, GeneratorStream, + RecordBatchStream, ServerMiddleware, ServerMiddlewareFactory, ServerAuthHandler, @@ -25,6 +29,14 @@ from influxdb_client_3.version import USER_AGENT +def asyncio_run(async_func): + def wrapper(*args, **kwargs): + return asyncio.run(async_func(*args, **kwargs)) + + wrapper.__signature__ = inspect.signature(async_func) + return wrapper + + def case_insensitive_header_lookup(headers, lkey): """Lookup the value of a given key in the given headers. The lkey is case-insensitive. @@ -368,3 +380,106 @@ def test_prepare_query(self): client.do_get(ticket, options) assert _req_headers['authorization'] == [f"Bearer {token}"] _req_headers = {} + + @asyncio_run + async def test_query_async_pandas(self): + with ConstantFlightServer() as server: + connection_string = f"grpc://localhost:{server.port}" + token = "my_token" + database = "my_database" + q_api = QueryApi( + connection_string=connection_string, + token=token, + flight_client_options={"generic_options": [('Foo', 'Bar')]}, + proxy=None, + options=None + ) + + query = "SELECT * FROM data" + pndf = await q_api.query_async(query, "sql", "pandas", database) + + cd = ConstantData() + numpy_array = pndf.T.to_numpy() + tuples = [] + for n in range(len(numpy_array[0])): + tuples.append((numpy_array[0][n], numpy_array[1][n], numpy_array[2][n])) + + for constant in cd.to_tuples(): + assert constant in tuples + + assert ('sql_query', query, -1.0) in tuples + assert ('database', database, -1.0) in tuples + assert ('query_type', 'sql', -1.0) in tuples + + @asyncio_run + async def test_query_async_table(self): + with ConstantFlightServer() as server: + connection_string = f"grpc://localhost:{server.port}" + token = "my_token" + database = "my_database" + q_api = QueryApi( + connection_string=connection_string, + token=token, + flight_client_options={"generic_options": [('Foo', 'Bar')]}, + proxy=None, + options=None + ) + query = "SELECT * FROM data" + table = await q_api.query_async(query, "sql", "", database) + + cd = ConstantData() + + result_list = table.to_pylist() + for item in cd.to_list(): + assert item in result_list + + assert {'data': 'database', 'reference': 'my_database', 'value': -1.0} in result_list + assert {'data': 'sql_query', 'reference': 'SELECT * FROM data', 'value': -1.0} in result_list + assert {'data': 'query_type', 'reference': 'sql', 'value': -1.0} in result_list + + +class ConstantData: + + def __init__(self): + self.data = [ + array(['temp', 'temp', 'temp']), + array(['kitchen', 'common', 'foyer']), + array([36.9, 25.7, 9.8]) + ] + self.names = ['data', 'reference', 'value'] + + def to_tuples(self): + response = [] + for n in range(3): + response.append((self.data[0][n].as_py(), self.data[1][n].as_py(), self.data[2][n].as_py())) + return response + + def to_list(self): + response = [] + for it in range(len(self.data[0])): + item = {} + for o in range(len(self.names)): + item[self.names[o]] = self.data[o][it].as_py() + response.append(item) + return response + + +class ConstantFlightServer(FlightServerBase): + + def __init__(self, location=None, options=None, **kwargs): + super().__init__(location, **kwargs) + self.cd = ConstantData() + self.options = options + + # respond with Constant Data plus fields from ticket + def do_get(self, context, ticket): + result_table = Table.from_arrays(self.cd.data, names=self.cd.names) + tkt = json.loads(ticket.ticket.decode('utf-8')) + for key in tkt.keys(): + tkt_data = [ + array([key]), + array([tkt[key]]), + array([-1.0]) + ] + result_table = concat_tables([result_table, Table.from_arrays(tkt_data, names=self.cd.names)]) + return RecordBatchStream(result_table, options=self.options) From 99b4721a7e8547b34a655ac226b0e49c6e43af82 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Tue, 18 Mar 2025 10:48:44 +0100 Subject: [PATCH 05/15] test: refactor - move arrow flight server mocks to util/mocks.py package --- tests/test_query.py | 145 +++++----------------------------------- tests/util/__init__.py | 1 + tests/util/mocks.py | 148 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 165 insertions(+), 129 deletions(-) create mode 100644 tests/util/__init__.py create mode 100644 tests/util/mocks.py diff --git a/tests/test_query.py b/tests/test_query.py index 3891073..6985cf9 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -1,26 +1,12 @@ import asyncio import inspect import unittest -import struct import os import json from unittest.mock import Mock, ANY -from pyarrow import ( - array, - concat_tables, - Table -) - from pyarrow.flight import ( FlightClient, - FlightServerBase, - FlightUnauthenticatedError, - GeneratorStream, - RecordBatchStream, - ServerMiddleware, - ServerMiddlewareFactory, - ServerAuthHandler, Ticket ) @@ -28,6 +14,16 @@ from influxdb_client_3.query.query_api import QueryApiOptionsBuilder, QueryApi from influxdb_client_3.version import USER_AGENT +from tests.util.mocks import ( + ConstantData, + ConstantFlightServer, + HeaderCheckFlightServer, + HeaderCheckServerMiddlewareFactory, + NoopAuthHandler, + get_req_headers, + set_req_headers +) + def asyncio_run(async_func): def wrapper(*args, **kwargs): @@ -46,76 +42,12 @@ def case_insensitive_header_lookup(headers, lkey): return headers.get(key) -class NoopAuthHandler(ServerAuthHandler): - """A no-op auth handler - as seen in pyarrow tests""" - - def authenticate(self, outgoing, incoming): - """Do nothing""" - - def is_valid(self, token): - """ - Return an empty string - N.B. Returning None causes Type error - :param token: - :return: - """ - return "" - - -_req_headers = {} - - -class HeaderCheckServerMiddlewareFactory(ServerMiddlewareFactory): - """Factory to create HeaderCheckServerMiddleware and check header values""" - def start_call(self, info, headers): - auth_header = case_insensitive_header_lookup(headers, "Authorization") - values = auth_header[0].split(' ') - if values[0] != 'Bearer': - raise FlightUnauthenticatedError("Token required") - global _req_headers - _req_headers = headers - return HeaderCheckServerMiddleware(values[1]) - - -class HeaderCheckServerMiddleware(ServerMiddleware): - """ - Middleware needed to catch request headers via factory - N.B. As found in pyarrow tests - """ - def __init__(self, token, *args, **kwargs): - super().__init__(*args, **kwargs) - self.token = token - - def sending_headers(self): - return {'authorization': 'Bearer ' + self.token} - - -class HeaderCheckFlightServer(FlightServerBase): - """Mock server handle gRPC do_get calls""" - def do_get(self, context, ticket): - """Return something to avoid needless errors""" - data = [ - array([b"Vltava", struct.pack(' 0 assert _req_headers['authorization'][0] == "Bearer TEST_TOKEN" assert _req_headers['user-agent'][0].find(USER_AGENT) > -1 - _req_headers = {} + set_req_headers({}) class TestQuery(unittest.TestCase): @@ -351,7 +284,7 @@ def test_secondary_user_agent_in_options(self): q_api._flight_client_options['generic_options']) def test_prepare_query(self): - global _req_headers + set_req_headers({}) token = 'my_token' q_api = QueryApi( connection_string="grpc+tls://my-server.org", @@ -378,8 +311,9 @@ def test_prepare_query(self): middleware={"check": HeaderCheckServerMiddlewareFactory()}) as server: with FlightClient(('localhost', server.port)) as client: client.do_get(ticket, options) + _req_headers = get_req_headers() assert _req_headers['authorization'] == [f"Bearer {token}"] - _req_headers = {} + set_req_headers({}) @asyncio_run async def test_query_async_pandas(self): @@ -436,50 +370,3 @@ async def test_query_async_table(self): assert {'data': 'database', 'reference': 'my_database', 'value': -1.0} in result_list assert {'data': 'sql_query', 'reference': 'SELECT * FROM data', 'value': -1.0} in result_list assert {'data': 'query_type', 'reference': 'sql', 'value': -1.0} in result_list - - -class ConstantData: - - def __init__(self): - self.data = [ - array(['temp', 'temp', 'temp']), - array(['kitchen', 'common', 'foyer']), - array([36.9, 25.7, 9.8]) - ] - self.names = ['data', 'reference', 'value'] - - def to_tuples(self): - response = [] - for n in range(3): - response.append((self.data[0][n].as_py(), self.data[1][n].as_py(), self.data[2][n].as_py())) - return response - - def to_list(self): - response = [] - for it in range(len(self.data[0])): - item = {} - for o in range(len(self.names)): - item[self.names[o]] = self.data[o][it].as_py() - response.append(item) - return response - - -class ConstantFlightServer(FlightServerBase): - - def __init__(self, location=None, options=None, **kwargs): - super().__init__(location, **kwargs) - self.cd = ConstantData() - self.options = options - - # respond with Constant Data plus fields from ticket - def do_get(self, context, ticket): - result_table = Table.from_arrays(self.cd.data, names=self.cd.names) - tkt = json.loads(ticket.ticket.decode('utf-8')) - for key in tkt.keys(): - tkt_data = [ - array([key]), - array([tkt[key]]), - array([-1.0]) - ] - result_table = concat_tables([result_table, Table.from_arrays(tkt_data, names=self.cd.names)]) - return RecordBatchStream(result_table, options=self.options) diff --git a/tests/util/__init__.py b/tests/util/__init__.py new file mode 100644 index 0000000..3bf511f --- /dev/null +++ b/tests/util/__init__.py @@ -0,0 +1 @@ +"""Package for tests/util module.""" diff --git a/tests/util/mocks.py b/tests/util/mocks.py new file mode 100644 index 0000000..33ebe2e --- /dev/null +++ b/tests/util/mocks.py @@ -0,0 +1,148 @@ +import json +import struct + +from pyarrow import ( + array, + Table, + concat_tables +) + +from pyarrow.flight import ( + FlightServerBase, + RecordBatchStream, + ServerMiddlewareFactory, + FlightUnauthenticatedError, + ServerMiddleware, + GeneratorStream, + ServerAuthHandler +) + + +class NoopAuthHandler(ServerAuthHandler): + """A no-op auth handler - as seen in pyarrow tests""" + + def authenticate(self, outgoing, incoming): + """Do nothing""" + + def is_valid(self, token): + """ + Return an empty string + N.B. Returning None causes Type error + :param token: + :return: + """ + return "" + + +def case_insensitive_header_lookup(headers, lkey): + """Lookup the value of a given key in the given headers. + The lkey is case-insensitive. + """ + for key in headers: + if key.lower() == lkey.lower(): + return headers.get(key) + + +req_headers = {} + + +def set_req_headers(headers): + global req_headers + req_headers = headers + + +def get_req_headers(): + global req_headers + return req_headers + + +class ConstantData: + + def __init__(self): + self.data = [ + array(['temp', 'temp', 'temp']), + array(['kitchen', 'common', 'foyer']), + array([36.9, 25.7, 9.8]) + ] + self.names = ['data', 'reference', 'value'] + + def to_tuples(self): + response = [] + for n in range(3): + response.append((self.data[0][n].as_py(), self.data[1][n].as_py(), self.data[2][n].as_py())) + return response + + def to_list(self): + response = [] + for it in range(len(self.data[0])): + item = {} + for o in range(len(self.names)): + item[self.names[o]] = self.data[o][it].as_py() + response.append(item) + return response + + +class ConstantFlightServer(FlightServerBase): + + def __init__(self, location=None, options=None, **kwargs): + super().__init__(location, **kwargs) + self.cd = ConstantData() + self.options = options + + # respond with Constant Data plus fields from ticket + def do_get(self, context, ticket): + result_table = Table.from_arrays(self.cd.data, names=self.cd.names) + tkt = json.loads(ticket.ticket.decode('utf-8')) + for key in tkt.keys(): + tkt_data = [ + array([key]), + array([tkt[key]]), + array([-1.0]) + ] + result_table = concat_tables([result_table, Table.from_arrays(tkt_data, names=self.cd.names)]) + return RecordBatchStream(result_table, options=self.options) + + +class HeaderCheckServerMiddlewareFactory(ServerMiddlewareFactory): + """Factory to create HeaderCheckServerMiddleware and check header values""" + def start_call(self, info, headers): + auth_header = case_insensitive_header_lookup(headers, "Authorization") + values = auth_header[0].split(' ') + if values[0] != 'Bearer': + raise FlightUnauthenticatedError("Token required") + global req_headers + req_headers = headers + return HeaderCheckServerMiddleware(values[1]) + + +class HeaderCheckServerMiddleware(ServerMiddleware): + """ + Middleware needed to catch request headers via factory + N.B. As found in pyarrow tests + """ + def __init__(self, token, *args, **kwargs): + super().__init__(*args, **kwargs) + self.token = token + + def sending_headers(self): + return {'authorization': 'Bearer ' + self.token} + + +class HeaderCheckFlightServer(FlightServerBase): + """Mock server handle gRPC do_get calls""" + def do_get(self, context, ticket): + """Return something to avoid needless errors""" + data = [ + array([b"Vltava", struct.pack(' Date: Tue, 18 Mar 2025 14:58:29 +0100 Subject: [PATCH 06/15] test: adds test of async behavior of query_api.query_async() --- tests/test_query.py | 72 ++++++++++++++++++++++++++++++++++++++++++++- tests/util/mocks.py | 12 ++++++++ 2 files changed, 83 insertions(+), 1 deletion(-) diff --git a/tests/test_query.py b/tests/test_query.py index 6985cf9..8c0ad29 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -1,5 +1,8 @@ import asyncio import inspect +import sys +import time +import traceback import unittest import os import json @@ -17,6 +20,7 @@ from tests.util.mocks import ( ConstantData, ConstantFlightServer, + ConstantFlightServerDelayed, HeaderCheckFlightServer, HeaderCheckServerMiddlewareFactory, NoopAuthHandler, @@ -27,7 +31,11 @@ def asyncio_run(async_func): def wrapper(*args, **kwargs): - return asyncio.run(async_func(*args, **kwargs)) + try: + return asyncio.run(async_func(*args, **kwargs)) + except Exception as e: + print(traceback.format_exc(), file=sys.stderr) + raise e wrapper.__signature__ = inspect.signature(async_func) return wrapper @@ -370,3 +378,65 @@ async def test_query_async_table(self): assert {'data': 'database', 'reference': 'my_database', 'value': -1.0} in result_list assert {'data': 'sql_query', 'reference': 'SELECT * FROM data', 'value': -1.0} in result_list assert {'data': 'query_type', 'reference': 'sql', 'value': -1.0} in result_list + + @asyncio_run + async def test_query_async_delayed(self): + events = dict() + with ConstantFlightServerDelayed(delay=1) as server: + connection_string = f"grpc://localhost:{server.port}" + token = "my_token" + database = "my_database" + q_api = QueryApi( + connection_string=connection_string, + token=token, + flight_client_options={"generic_options": [('Foo', 'Bar')]}, + proxy=None, + options=None + ) + query = "SELECT * FROM data" + + # coroutine to handle query_async + async def local_query(query_api): + events['query_start'] = time.time_ns() + t_result = await query_api.query_async(query, "sql", "", database) + # t_result = query_api.query(query, "sql", "", database) + events['query_result'] = time.time_ns() + return t_result + + # second coroutine to run in "parallel" + async def fibo(iters): + events['fibo_start'] = time.time_ns() + await asyncio.sleep(0.5) + n0 = 1 + n1 = 1 + result = n1 + n0 + for _ in range(iters): + n0 = n1 + n1 = result + result = n1 + n0 + events['fibo_end'] = time.time_ns() + return result + + results = await asyncio.gather(local_query(q_api), fibo(50)) + + table = results[0] + fibo_num = results[1] + + # verify fibo calculation + assert fibo_num == 53316291173 + + # verify constant data + cd = ConstantData() + + result_list = table.to_pylist() + for item in cd.to_list(): + assert item in result_list + + # verify that fibo coroutine was processed while query_async was processing + # i.e. query call does not block event_loop + # fibo started after query_async + assert events['query_start'] < events['fibo_start'], (f"query_start: {events['query_start']} should start " + f"before fibo_start: {events['fibo_start']}") + # fibo ended before query_async + assert events['query_result'] > events['fibo_end'], (f"query_result: {events['query_result']} should occur " + f"after fibo_end: {events['fibo_end']}") diff --git a/tests/util/mocks.py b/tests/util/mocks.py index 33ebe2e..bb6bc24 100644 --- a/tests/util/mocks.py +++ b/tests/util/mocks.py @@ -1,5 +1,6 @@ import json import struct +import time from pyarrow import ( array, @@ -103,6 +104,17 @@ def do_get(self, context, ticket): return RecordBatchStream(result_table, options=self.options) +class ConstantFlightServerDelayed(ConstantFlightServer): + + def __init__(self, location=None, options=None, delay=0.5, **kwargs): + super().__init__(location, **kwargs) + self.delay = delay + + def do_get(self, context, ticket): + time.sleep(self.delay) + return super().do_get(context, ticket) + + class HeaderCheckServerMiddlewareFactory(ServerMiddlewareFactory): """Factory to create HeaderCheckServerMiddleware and check header values""" def start_call(self, info, headers): From 707a83a30d230a5423689feca373eed0e77d3ea2 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Tue, 18 Mar 2025 15:08:31 +0100 Subject: [PATCH 07/15] test: more relevant assert in new test --- tests/test_query.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/test_query.py b/tests/test_query.py index 8c0ad29..449977f 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -437,6 +437,10 @@ async def fibo(iters): # fibo started after query_async assert events['query_start'] < events['fibo_start'], (f"query_start: {events['query_start']} should start " f"before fibo_start: {events['fibo_start']}") + # fibo started before query_async ends - i.e. query_async did not block it + assert events['query_result'] > events['fibo_start'], (f"query_result: {events['query_result']} should " + f"occur after fibo_start: {events['fibo_start']}") + # fibo ended before query_async assert events['query_result'] > events['fibo_end'], (f"query_result: {events['query_result']} should occur " f"after fibo_end: {events['fibo_end']}") From 7a80a4e5347d42fa5948489ad7181a0ff7d48f06 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Tue, 18 Mar 2025 16:27:35 +0100 Subject: [PATCH 08/15] test: refactor - move asyncio_run helper method to utils/__init__.py --- tests/test_query.py | 16 +--------------- tests/util/__init__.py | 16 ++++++++++++++++ 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/tests/test_query.py b/tests/test_query.py index 449977f..8be7047 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -1,8 +1,5 @@ import asyncio -import inspect -import sys import time -import traceback import unittest import os import json @@ -16,6 +13,7 @@ from influxdb_client_3 import InfluxDBClient3 from influxdb_client_3.query.query_api import QueryApiOptionsBuilder, QueryApi from influxdb_client_3.version import USER_AGENT +from tests.util import asyncio_run from tests.util.mocks import ( ConstantData, @@ -29,18 +27,6 @@ ) -def asyncio_run(async_func): - def wrapper(*args, **kwargs): - try: - return asyncio.run(async_func(*args, **kwargs)) - except Exception as e: - print(traceback.format_exc(), file=sys.stderr) - raise e - - wrapper.__signature__ = inspect.signature(async_func) - return wrapper - - def case_insensitive_header_lookup(headers, lkey): """Lookup the value of a given key in the given headers. The lkey is case-insensitive. diff --git a/tests/util/__init__.py b/tests/util/__init__.py index 3bf511f..2ff706e 100644 --- a/tests/util/__init__.py +++ b/tests/util/__init__.py @@ -1 +1,17 @@ """Package for tests/util module.""" +import asyncio +import inspect +import sys +import traceback + + +def asyncio_run(async_func): + def wrapper(*args, **kwargs): + try: + return asyncio.run(async_func(*args, **kwargs)) + except Exception as e: + print(traceback.format_exc(), file=sys.stderr) + raise e + + wrapper.__signature__ = inspect.signature(async_func) + return wrapper From 299bc8374f81b0501f4ab0f545d22b4ffc533417 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Tue, 18 Mar 2025 17:22:01 +0100 Subject: [PATCH 09/15] test: adds new unit test for client.query_async() --- tests/test_influxdb_client_3.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tests/test_influxdb_client_3.py b/tests/test_influxdb_client_3.py index 605edd3..1b52b76 100644 --- a/tests/test_influxdb_client_3.py +++ b/tests/test_influxdb_client_3.py @@ -2,6 +2,8 @@ from unittest.mock import patch from influxdb_client_3 import InfluxDBClient3 +from tests.util import asyncio_run +from tests.util.mocks import ConstantFlightServer, ConstantData class TestInfluxDBClient3(unittest.TestCase): @@ -48,6 +50,30 @@ def test_token_auth_scheme_explicit(self): ) self.assertEqual(client._client.auth_header_value, "my_scheme my_token") + @asyncio_run + async def test_query_async(self): + with ConstantFlightServer() as server: + client = InfluxDBClient3( + host=f"http://localhost:{server.port}", + org="my_org", + database="my_db", + token="my_token", + ) + + query = "SELECT * FROM my_data" + + table = await client.query_async(query=query, language="sql") + + result_list = table.to_pylist() + + cd = ConstantData() + for item in cd.to_list(): + assert item in result_list + + assert {'data': 'database', 'reference': 'my_db', 'value': -1.0} in result_list + assert {'data': 'sql_query', 'reference': query, 'value': -1.0} in result_list + assert {'data': 'query_type', 'reference': 'sql', 'value': -1.0} in result_list + if __name__ == '__main__': unittest.main() From a436699bd7b873789769ae60ca25da8f97709dce Mon Sep 17 00:00:00 2001 From: karel rehor Date: Wed, 19 Mar 2025 13:55:24 +0100 Subject: [PATCH 10/15] test: adds integration test for client.query_async() --- tests/test_influxdb_client_3_integration.py | 25 ++++++++++++ tests/util/__init__.py | 43 +++++++++++++++++++++ 2 files changed, 68 insertions(+) diff --git a/tests/test_influxdb_client_3_integration.py b/tests/test_influxdb_client_3_integration.py index 3a51cc6..48181f7 100644 --- a/tests/test_influxdb_client_3_integration.py +++ b/tests/test_influxdb_client_3_integration.py @@ -10,6 +10,7 @@ from pyarrow._flight import FlightError from influxdb_client_3 import InfluxDBClient3, InfluxDBError, write_client_options, WriteOptions +from tests.util import asyncio_run, lp_to_py_object def random_hex(len=6): @@ -249,3 +250,27 @@ def test_verify_ssl_false(self): assert len(list_results) > 0 finally: self.remove_test_cert(cert_file) + + @asyncio_run + async def test_verify_query_async(self): + measurement = f'test{random_hex(6)}' + data = [] + lp_template = "%s,location=%s val=%f,ival=%di,index=%di %d" + data_size = 10 + interval = 1_000_000_000 * 10 + ts = time.time_ns() - interval * data_size + locations = ['springfield', 'gotham', 'balbec', 'yonville'] + for i in range(data_size): + data.append(lp_template % (measurement, locations[random.randint(0, len(locations) - 1)], + random.random() * 10, + random.randint(0, 6), i, ts)) + ts = ts + interval + + self.client.write(data) + query = f"SELECT * FROM \"{measurement}\" ORDER BY time DESC" + + result = await self.client.query_async(query) + + result_list = result.to_pylist() + for item in data: + assert lp_to_py_object(item) in result_list, f"original lp data \"{item}\" should be in result list" diff --git a/tests/util/__init__.py b/tests/util/__init__.py index 2ff706e..8a17caf 100644 --- a/tests/util/__init__.py +++ b/tests/util/__init__.py @@ -4,6 +4,8 @@ import sys import traceback +import pandas + def asyncio_run(async_func): def wrapper(*args, **kwargs): @@ -15,3 +17,44 @@ def wrapper(*args, **kwargs): wrapper.__signature__ = inspect.signature(async_func) return wrapper + + +def lp_to_py_object(lp: str): + """ + Result format matches the format of objects returned in pyarrow.Table.to_pylist. + + For verifying test data returned from queries. + + :param lp: a lineprotocol formatted string + :return: a list object + """ + result = {} + groups = lp.split(' ') + + tags = groups[0].split(',') + tags.remove(tags[0]) + for tag in tags: + t_set = tag.split('=') + result[t_set[0]] = t_set[1] + + fields = groups[1].split(',') + for field in fields: + f_set = field.split('=') + lastchar = f_set[1][len(f_set[1]) - 1] + match lastchar: + case 'i': # integer + result[f_set[0]] = int(f_set[1].replace('i','')) + case 'u': # unsigned integer + result[f_set[0]] = int(f_set[1].replace('u','')) + case '"': # string + result[f_set[0]] = f_set[1].replace('"',"") + case 'e' | 'E' | 't' | 'T' | 'f' | 'F': + if f_set[1][0].lower() == 't': + result[f_set[0]] = True + else: + result[f_set[0]] = False + case _: # assume float + result[f_set[0]] = float(f_set[1]) + + result['time'] = pandas.Timestamp(int(groups[2])) + return result From 0559be454e4087bb0c89538e88174fe1a3e026b9 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Wed, 19 Mar 2025 14:25:24 +0100 Subject: [PATCH 11/15] test: replace match:case in util, which is supported only in 3.10+ --- tests/util/__init__.py | 41 ++++++++++++++++++++++++++--------------- 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/tests/util/__init__.py b/tests/util/__init__.py index 8a17caf..284b56f 100644 --- a/tests/util/__init__.py +++ b/tests/util/__init__.py @@ -38,23 +38,34 @@ def lp_to_py_object(lp: str): result[t_set[0]] = t_set[1] fields = groups[1].split(',') + + def check_bool(token): + if token.lower()[0] == 't': + return True + return False + + parse_field_val = { + 'i': lambda s: int(s.replace('i', '')), + 'u': lambda s: int(s.replace('u', '')), + '\"': lambda s: s.replace('"', ''), + 'e': lambda s: check_bool(s), + 'E': lambda s: check_bool(s), + 't': lambda s: check_bool(s), + 'T': lambda s: check_bool(s), + 'f': lambda s: check_bool(s), + 'F': lambda s: check_bool(s), + 'd': lambda s: float(s) + } + for field in fields: f_set = field.split('=') - lastchar = f_set[1][len(f_set[1]) - 1] - match lastchar: - case 'i': # integer - result[f_set[0]] = int(f_set[1].replace('i','')) - case 'u': # unsigned integer - result[f_set[0]] = int(f_set[1].replace('u','')) - case '"': # string - result[f_set[0]] = f_set[1].replace('"',"") - case 'e' | 'E' | 't' | 'T' | 'f' | 'F': - if f_set[1][0].lower() == 't': - result[f_set[0]] = True - else: - result[f_set[0]] = False - case _: # assume float - result[f_set[0]] = float(f_set[1]) + last_char = f_set[1][len(f_set[1]) - 1] + if last_char in '0123456789': + last_char = 'd' + if last_char in parse_field_val.keys(): + result[f_set[0]] = parse_field_val[last_char](f_set[1]) + else: + result[f_set[0]] = None result['time'] = pandas.Timestamp(int(groups[2])) return result From 87f079c5aba8c24b76603ca07950d5057fec6a3d Mon Sep 17 00:00:00 2001 From: karel rehor Date: Wed, 19 Mar 2025 15:15:22 +0100 Subject: [PATCH 12/15] docs: adds code documentation to key public methods --- influxdb_client_3/__init__.py | 17 +++++++++++++++++ influxdb_client_3/query/query_api.py | 15 +++++++++++++++ tests/util/__init__.py | 16 ++++++++++++++++ 3 files changed, 48 insertions(+) diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index de26941..753a83b 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -279,6 +279,23 @@ def query(self, query: str, language: str = "sql", mode: str = "all", database: raise e async def query_async(self, query: str, language: str = "sql", mode: str = "all", database: str = None, **kwargs): + """Query data from InfluxDB asynchronously. + + If you want to use query parameters, you can pass them as kwargs: + + >>> await client.query_async("select * from cpu where host=$host", query_parameters={"host": "server01"}) + + :param query: The query to execute on the database. + :param language: The query language to use. It should be one of "influxql" or "sql". Defaults to "sql". + :param mode: The mode to use for the query. It should be one of "all", "pandas", "polars", "chunk", + "reader" or "schema". Defaults to "all". + :param database: The database to query from. If not provided, uses the database provided during initialization. + :param kwargs: Additional arguments to pass to the ``FlightCallOptions headers``. For example, it can be used to + set up per request headers. + :keyword query_parameters: The query parameters to use in the query. + It should be a ``dictionary`` of key-value pairs. + :return: The query result in the specified mode. + """ if mode == "polars" and polars is False: raise ImportError("Polars is not installed. Please install it with `pip install polars`.") diff --git a/influxdb_client_3/query/query_api.py b/influxdb_client_3/query/query_api.py index fd05cf9..679803e 100644 --- a/influxdb_client_3/query/query_api.py +++ b/influxdb_client_3/query/query_api.py @@ -172,6 +172,21 @@ def query(self, query: str, language: str, mode: str, database: str, **kwargs): raise e async def query_async(self, query: str, language: str, mode: str, database: str, **kwargs): + """Query data from InfluxDB asynchronously. + + Wraps internal FlightClient.doGet call in its own executor, so that the event_loop will not be blocked. + + :param query: The query to execute on the database. + :param language: The query language. + :param mode: The mode to use for the query. + It should be one of "all", "pandas", "polars", "chunk", "reader" or "schema". + :param database: The database to query from. + :param kwargs: Additional arguments to pass to the ``FlightCallOptions headers``. + For example, it can be used to set up per request headers. + :keyword query_parameters: The query parameters to use in the query. + It should be a ``dictionary`` of key-value pairs. + :return: The query result in the specified mode. + """ try: ticket, options = self._prepare_query(query, language, database, **kwargs) loop = asyncio.get_running_loop() diff --git a/tests/util/__init__.py b/tests/util/__init__.py index 284b56f..10710fc 100644 --- a/tests/util/__init__.py +++ b/tests/util/__init__.py @@ -8,6 +8,22 @@ def asyncio_run(async_func): + """ + Fixture for running tests asynchronously + + Example + + .. sourcecode:: python + + @asyncio_run + async def test_my_feature(self): + asyncio.sleep(1) + print("waking...") + ... + + :param async_func: + :return: + """ def wrapper(*args, **kwargs): try: return asyncio.run(async_func(*args, **kwargs)) From 5e28213d555ce2e00c9c0cf8fa6c88b1f480ac2d Mon Sep 17 00:00:00 2001 From: karel rehor Date: Wed, 19 Mar 2025 17:32:10 +0100 Subject: [PATCH 13/15] docs: add example --- Examples/query_async.py | 87 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 Examples/query_async.py diff --git a/Examples/query_async.py b/Examples/query_async.py new file mode 100644 index 0000000..e65a1e6 --- /dev/null +++ b/Examples/query_async.py @@ -0,0 +1,87 @@ +import asyncio +import random +import time + +import pandas + +from influxdb_client_3 import InfluxDBClient3 + +from config import Config + + +async def fibio(iterations, grit=0.5): + """ + example coroutine to run parallel with query_async + :param iterations: + :param grit: + :return: + """ + n0 = 1 + n1 = 1 + vals = [n0, n1] + for _ in range(iterations): + val = n0 + n1 + n0 = n1 + n1 = val + print(val) + vals.append(val) + await asyncio.sleep(grit) + return vals + + +def write_data(client: InfluxDBClient3, measurement): + """ + Synchronous write - only for preparing data + :param client: + :param measurement: + :return: + """ + ids = ['s3b1', 'dq41', 'sgw22'] + lp_template = f"{measurement},id=%s speed=%f,alt=%f,bearing=%f %d" + data_size = 10 + data = [] + interval = 10 * 1_000_000_000 + ts = time.time_ns() - (interval * data_size) + for _ in range(data_size): + data.append(lp_template % (ids[random.randint(0, len(ids) - 1)], + random.random() * 300, + random.random() * 2000, + random.random() * 360, ts)) + ts += interval + + client.write(data) + + +async def query_data(client: InfluxDBClient3, measurement): + """ + Query asynchronously - should not block other coroutines + :param client: + :param measurement: + :return: + """ + query = f"SELECT * FROM \"{measurement}\" WHERE time >= now() - interval '5 minutes' ORDER BY time DESC" + print(f"query start: {pandas.Timestamp(time.time_ns())}") + table = await client.query_async(query) + print(f"query returned: {pandas.Timestamp(time.time_ns())}") + return table.to_pandas() + + +async def main(): + config = Config() + client = InfluxDBClient3( + host=config.host, + token=config.token, + database=config.database, + org=config.org + ) + measurement = 'example_uav' + write_data(client, measurement) + + # run both coroutines simultaneously + result = await asyncio.gather(fibio(10, 0.2),query_data(client, measurement)) + print(f"fibio sequence = {result[0]}") + print(f"data set =\n{result[1]}") + + +if __name__ == "__main__": + asyncio.run(main()) From 97060053888c0f093522d8c95cf4f694adeee6d4 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Wed, 19 Mar 2025 17:39:05 +0100 Subject: [PATCH 14/15] docs: update CHANGELOG.md --- CHANGELOG.md | 4 ++++ Examples/query_async.py | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ab0df23..bd8ffa2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## 0.12.0 [unreleased] +### Features + +1. [123](https://github.com/InfluxCommunity/influxdb3-python/pull/123): Introduces `query_async` method. + ### Bug Fixes 1. [#121](https://github.com/InfluxCommunity/influxdb3-python/pull/121): Fix use of arguments `verify_ssl` and `ssl_ca_cert` in `QueryApi`. diff --git a/Examples/query_async.py b/Examples/query_async.py index e65a1e6..f6079f5 100644 --- a/Examples/query_async.py +++ b/Examples/query_async.py @@ -78,7 +78,7 @@ async def main(): write_data(client, measurement) # run both coroutines simultaneously - result = await asyncio.gather(fibio(10, 0.2),query_data(client, measurement)) + result = await asyncio.gather(fibio(10, 0.2), query_data(client, measurement)) print(f"fibio sequence = {result[0]}") print(f"data set =\n{result[1]}") From 5d35da86dd7e4d25849a2a1f4f9725ce36d0b87e Mon Sep 17 00:00:00 2001 From: karel rehor Date: Thu, 20 Mar 2025 16:35:00 +0100 Subject: [PATCH 15/15] chore: replace hard coded version in tests with USER_AGENT --- tests/test_query.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_query.py b/tests/test_query.py index 8be7047..85118e4 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -240,7 +240,7 @@ def test_multiple_flight_client_options(self): assert ('opt1', 'opt1 in args') in q_api._flight_client_options['generic_options'] assert ('optA', 'A in options') in q_api._flight_client_options['generic_options'] - assert (('grpc.secondary_user_agent', 'influxdb3-python/0.12.0dev0') in + assert (('grpc.secondary_user_agent', USER_AGENT) in q_api._flight_client_options['generic_options']) def test_override_secondary_user_agent_args(self): @@ -253,7 +253,7 @@ def test_override_secondary_user_agent_args(self): ) assert ('grpc.secondary_user_agent', 'my_custom_user_agent') in q_api._flight_client_options['generic_options'] - assert not (('grpc.secondary_user_agent', 'influxdb3-python/0.12.0dev0') in + assert not (('grpc.secondary_user_agent', USER_AGENT) in q_api._flight_client_options['generic_options']) def test_secondary_user_agent_in_options(self): @@ -274,7 +274,7 @@ def test_secondary_user_agent_in_options(self): assert ('optA', 'A in options') in q_api._flight_client_options['generic_options'] assert ('grpc.secondary_user_agent', 'my_custom_user_agent') in q_api._flight_client_options['generic_options'] - assert (('grpc.secondary_user_agent', 'influxdb3-python/0.12.0dev0') not in + assert (('grpc.secondary_user_agent', USER_AGENT) not in q_api._flight_client_options['generic_options']) def test_prepare_query(self):