From d94ea0a7d3da060e0f8ae4bd3144b6e23551ccc9 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Mon, 7 Oct 2024 10:18:46 +0200 Subject: [PATCH 1/4] fix: (WIP) issue 669 write precision to default in async API --- influxdb_client/client/write_api_async.py | 26 ++-- tests/test_InfluxDBClientAsync.py | 149 ++++++++++++++++++++-- 2 files changed, 154 insertions(+), 21 deletions(-) diff --git a/influxdb_client/client/write_api_async.py b/influxdb_client/client/write_api_async.py index e9e2018b..421fbf80 100644 --- a/influxdb_client/client/write_api_async.py +++ b/influxdb_client/client/write_api_async.py @@ -1,5 +1,6 @@ """Collect and async write time series data to InfluxDB Cloud or InfluxDB OSS.""" import logging +from asyncio import ensure_future, gather from collections import defaultdict from typing import Union, Iterable, NamedTuple @@ -114,12 +115,19 @@ async def write(self, bucket: str, org: str = None, self._append_default_tags(record) payloads = defaultdict(list) - self._serialize(record, write_precision, payloads, precision_from_point=False, **kwargs) - - # joint list by \n - body = b'\n'.join(payloads[write_precision]) - response = await self._write_service.post_write_async(org=org, bucket=bucket, body=body, - precision=write_precision, async_req=False, - _return_http_data_only=False, - content_type="text/plain; charset=utf-8") - return response[1] in (201, 204) + self._serialize(record, write_precision, payloads, precision_from_point=True, **kwargs) + + futures = [] + for payload in payloads.items(): + futures.append(ensure_future(self._write_service.post_write_async(org=org, bucket=bucket, + body=b'\n'.join(payload[1]), + precision=payload[0], async_req=False, + _return_http_data_only=False, + content_type="text/plain; charset=utf-8"))) + + results = await gather(*futures, return_exceptions=True) + for result in results: + if isinstance(result, Exception): + raise result + + return False not in [re[1] in (201,204) for re in results] diff --git a/tests/test_InfluxDBClientAsync.py b/tests/test_InfluxDBClientAsync.py index 7f8c6214..cb0586b9 100644 --- a/tests/test_InfluxDBClientAsync.py +++ b/tests/test_InfluxDBClientAsync.py @@ -1,11 +1,15 @@ import asyncio +import dateutil.parser import logging +import math import re +import time import unittest import os from datetime import datetime, timezone from io import StringIO +import pandas import pytest import warnings from aioresponses import aioresponses @@ -199,30 +203,151 @@ async def test_write_empty_data(self): self.assertEqual(True, response) + def gen_fractional_utc(self, nano, precision) -> str: + raw_sec = nano / 1_000_000_000 + if precision == WritePrecision.NS: + rem = f"{nano % 1_000_000_000}".rjust(9,"0").rstrip("0") + return (datetime.fromtimestamp(math.floor(raw_sec), tz=timezone.utc) + .isoformat() + .replace("+00:00", "") + f".{rem}Z") + #f".{rem}Z")) + elif precision == WritePrecision.US: + # rem = f"{round(nano / 1_000) % 1_000_000}"#.ljust(6,"0") + return (datetime.fromtimestamp(round(raw_sec,6), tz=timezone.utc) + .isoformat() + .replace("+00:00","") + .strip("0") + "Z" + ) + elif precision == WritePrecision.MS: + #rem = f"{round(nano / 1_000_000) % 1_000}".rjust(3, "0") + return (datetime.fromtimestamp(round(raw_sec,3), tz=timezone.utc) + .isoformat() + .replace("+00:00","") + .strip("0") + "Z" + ) + elif precision == WritePrecision.S: + return (datetime.fromtimestamp(round(raw_sec), tz=timezone.utc) + .isoformat() + .replace("+00:00","Z")) + else: + raise ValueError(f"Unknown precision: {precision}") + + @async_test async def test_write_points_different_precision(self): + now_ns = time.time_ns() + now_us = now_ns / 1_000 + now_ms = now_us / 1_000 + now_s = now_ms / 1_000 + + now_date_s = self.gen_fractional_utc(now_ns, WritePrecision.S) + now_date_ms = self.gen_fractional_utc(now_ns, WritePrecision.MS) + now_date_us = self.gen_fractional_utc(now_ns, WritePrecision.US) + now_date_ns = self.gen_fractional_utc(now_ns, WritePrecision.NS) + + points = { + WritePrecision.S: [], + WritePrecision.MS: [], + WritePrecision.US: [], + WritePrecision.NS: [] + } + + expected = {} + measurement = generate_name("measurement") - _point1 = Point(measurement).tag("location", "Prague").field("temperature", 25.3) \ - .time(datetime.fromtimestamp(0, tz=timezone.utc), write_precision=WritePrecision.S) - _point2 = Point(measurement).tag("location", "New York").field("temperature", 24.3) \ - .time(datetime.fromtimestamp(1, tz=timezone.utc), write_precision=WritePrecision.MS) - _point3 = Point(measurement).tag("location", "Berlin").field("temperature", 24.3) \ - .time(datetime.fromtimestamp(2, tz=timezone.utc), write_precision=WritePrecision.NS) - await self.client.write_api().write(bucket="my-bucket", record=[_point1, _point2, _point3], + # basic date-time value + points[WritePrecision.S].append(Point(measurement).tag("method", "SecDateTime").field("temperature", 25.3) \ + .time(datetime.fromtimestamp(round(now_s), tz=timezone.utc), write_precision=WritePrecision.S)) + expected['SecDateTime'] = now_date_s + points[WritePrecision.MS].append(Point(measurement).tag("method", "MilDateTime").field("temperature", 24.3) \ + .time(datetime.fromtimestamp(round(now_s,3), tz=timezone.utc), write_precision=WritePrecision.MS)) + expected['MilDateTime'] = now_date_ms + points[WritePrecision.US].append(Point(measurement).tag("method", "MicDateTime").field("temperature", 24.3) \ + .time(datetime.fromtimestamp(round(now_s,6), tz=timezone.utc), write_precision=WritePrecision.US)) + expected['MicDateTime'] = now_date_us + # N.B. datetime does not handle nanoseconds +# points[WritePrecision.NS].append(Point(measurement).tag("method", "NanDateTime").field("temperature", 24.3) \ +# .time(datetime.fromtimestamp(now_s, tz=timezone.utc), write_precision=WritePrecision.NS)) + + # long timestamps based on POSIX time + points[WritePrecision.S].append(Point(measurement).tag("method", "SecPosix").field("temperature", 24.3) \ + .time(round(now_s), write_precision=WritePrecision.S)) + expected['SecPosix'] = now_date_s + points[WritePrecision.MS].append(Point(measurement).tag("method", "MilPosix").field("temperature", 24.3) \ + .time(round(now_ms), write_precision=WritePrecision.MS)) + expected['MilPosix'] = now_date_ms + points[WritePrecision.US].append(Point(measurement).tag("method", "MicPosix").field("temperature", 24.3) \ + .time(round(now_us), write_precision=WritePrecision.US)) + expected['MicPosix'] = now_date_us + points[WritePrecision.NS].append(Point(measurement).tag("method", "NanPosix").field("temperature", 24.3) \ + .time(now_ns, write_precision=WritePrecision.NS)) + expected['NanPosix'] = now_date_ns + + # ISO Zulu datetime with ms, us and ns e.g. "2024-09-27T13:17:16.412399728Z" + points[WritePrecision.S].append(Point(measurement).tag("method", "SecDTZulu").field("temperature", 24.3) \ + .time(now_date_s, write_precision=WritePrecision.S)) + expected['SecDTZulu'] = now_date_s + points[WritePrecision.MS].append(Point(measurement).tag("method", "MilDTZulu").field("temperature", 24.3) \ + .time(now_date_ms, write_precision=WritePrecision.MS)) + expected['MilDTZulu'] = now_date_ms + points[WritePrecision.US].append(Point(measurement).tag("method", "MicDTZulu").field("temperature", 24.3) \ + .time(now_date_us, write_precision=WritePrecision.US)) + expected['MicDTZulu'] = now_date_us + # This keeps resulting in micro second resolution in response +# points[WritePrecision.NS].append(Point(measurement).tag("method", "NanDTZulu").field("temperature", 24.3) \ +# .time(now_date_ns, write_precision=WritePrecision.NS)) + + recs = [x for x in [v for v in points.values()]] + + await self.client.write_api().write(bucket="my-bucket", record=recs, write_precision=WritePrecision.NS) query = f''' from(bucket:"my-bucket") |> range(start: 0) |> filter(fn: (r) => r["_measurement"] == "{measurement}") - |> keep(columns: ["_time"]) + |> keep(columns: ["method","_time"]) ''' query_api = self.client.query_api() + # ensure calls fully processed on server + await asyncio.sleep(1) + raw = await query_api.query_raw(query) - self.assertEqual(8, len(raw.splitlines())) - self.assertEqual(',,0,1970-01-01T00:00:02Z', raw.splitlines()[4]) - self.assertEqual(',,0,1970-01-01T00:00:01Z', raw.splitlines()[5]) - self.assertEqual(',,0,1970-01-01T00:00:00Z', raw.splitlines()[6]) + linesRaw = raw.splitlines()[4:] + + lines = [] + for lnr in linesRaw: + lines.append(lnr[2:].split(",")) + + def get_time_for_method(lines, method): + for l in lines: + if l[2] == method: + return l[1] + return "" + + self.assertEqual(15, len(raw.splitlines())) + + for key in expected: + t = get_time_for_method(lines,key) + comp_time = dateutil.parser.isoparse(get_time_for_method(lines,key)) + target_time = dateutil.parser.isoparse(expected[key]) + self.assertEqual(target_time.date(), comp_time.date()) + self.assertEqual(target_time.hour, comp_time.hour) + self.assertEqual(target_time.second,comp_time.second) + dif = abs(target_time.microsecond - comp_time.microsecond) + if key[:3] == "Sec": + # Already tested + pass + elif key[:3] == "Mil": + # may be slight rounding differences + self.assertLess(dif, 1500, f"failed to match timestamp for {key} {target_time} != {comp_time}") + elif key[:3] == "Mic": + # may be slight rounding differences + self.assertLess(dif, 150, f"failed to match timestamp for {key} {target_time} != {comp_time}") + elif key[:3] == "Nan": + self.assertEqual(expected[key], get_time_for_method(lines, key)) + else: + raise Exception(f"Unhandled key {key}") @async_test async def test_delete_api(self): From 0c4ddb53a4ec619408f356b9742b9017d74128ca Mon Sep 17 00:00:00 2001 From: karel rehor Date: Mon, 7 Oct 2024 10:34:28 +0200 Subject: [PATCH 2/4] chore: fix lint issues --- influxdb_client/client/write_api_async.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/influxdb_client/client/write_api_async.py b/influxdb_client/client/write_api_async.py index 421fbf80..70bbab51 100644 --- a/influxdb_client/client/write_api_async.py +++ b/influxdb_client/client/write_api_async.py @@ -119,15 +119,16 @@ async def write(self, bucket: str, org: str = None, futures = [] for payload in payloads.items(): - futures.append(ensure_future(self._write_service.post_write_async(org=org, bucket=bucket, - body=b'\n'.join(payload[1]), - precision=payload[0], async_req=False, - _return_http_data_only=False, - content_type="text/plain; charset=utf-8"))) + futures.append(ensure_future + (self._write_service.post_write_async(org=org, bucket=bucket, + body=b'\n'.join(payload[1]), + precision=payload[0], async_req=False, + _return_http_data_only=False, + content_type="text/plain; charset=utf-8"))) results = await gather(*futures, return_exceptions=True) for result in results: if isinstance(result, Exception): raise result - return False not in [re[1] in (201,204) for re in results] + return False not in [re[1] in (201, 204) for re in results] From 942d717a9a9e62fb84c876631b2d80daf7f6c86d Mon Sep 17 00:00:00 2001 From: karel rehor Date: Tue, 8 Oct 2024 09:56:57 +0200 Subject: [PATCH 3/4] docs: update CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6937e242..6f879dd0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ### Bug Fixes 1. [#672](https://github.com/influxdata/influxdb-client-python/pull/672): Adding type validation to url attribute in client object +2. [#675](https://github.com/influxdata/influxdb-client-python/pull/675): Ensures WritePrecision in Point is preferred to `DEFAULT_PRECISION` ## 1.46.0 [2024-09-13] From b4d741e49a11bfa3f8be6d64ef822288b6ad7f09 Mon Sep 17 00:00:00 2001 From: karel rehor Date: Tue, 8 Oct 2024 15:31:13 +0200 Subject: [PATCH 4/4] chore: improve indexing of range --- influxdb_client/client/write_api_async.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/influxdb_client/client/write_api_async.py b/influxdb_client/client/write_api_async.py index 70bbab51..38937eca 100644 --- a/influxdb_client/client/write_api_async.py +++ b/influxdb_client/client/write_api_async.py @@ -118,11 +118,11 @@ async def write(self, bucket: str, org: str = None, self._serialize(record, write_precision, payloads, precision_from_point=True, **kwargs) futures = [] - for payload in payloads.items(): + for payload_precision, payload_line in payloads.items(): futures.append(ensure_future (self._write_service.post_write_async(org=org, bucket=bucket, - body=b'\n'.join(payload[1]), - precision=payload[0], async_req=False, + body=b'\n'.join(payload_line), + precision=payload_precision, async_req=False, _return_http_data_only=False, content_type="text/plain; charset=utf-8")))