Skip to content

Commit aa44d33

Browse files
committed
introduce line protocol version
1 parent 0f96d87 commit aa44d33

File tree

19 files changed

+487
-243
lines changed

19 files changed

+487
-243
lines changed

ci/run_all_tests.py

-2
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,6 @@ def main():
5151
'--', '--nocapture', cwd='questdb-rs')
5252
run_cmd('cargo', 'test', '--features=almost-all-features',
5353
'--', '--nocapture', cwd='questdb-rs')
54-
run_cmd('cargo', 'test', '--features=almost-all-features,protocol-version-1',
55-
'--', '--nocapture', cwd='questdb-rs')
5654
run_cmd(str(test_line_sender_path))
5755
run_cmd(str(test_line_sender_path_CXX20))
5856
#run_cmd('python3', str(system_test_path), 'run', '--versions', qdb_v, '-v')

cpp_test/test_line_sender.cpp

+17-8
Original file line numberDiff line numberDiff line change
@@ -694,13 +694,19 @@ TEST_CASE("os certs")
694694

695695
{
696696
questdb::ingress::opts opts{
697-
questdb::ingress::protocol::https, "localhost", server.port()};
697+
questdb::ingress::protocol::https,
698+
"localhost",
699+
server.port(),
700+
true};
698701
opts.tls_ca(questdb::ingress::ca::os_roots);
699702
}
700703

701704
{
702705
questdb::ingress::opts opts{
703-
questdb::ingress::protocol::https, "localhost", server.port()};
706+
questdb::ingress::protocol::https,
707+
"localhost",
708+
server.port(),
709+
true};
704710
opts.tls_ca(questdb::ingress::ca::webpki_and_os_roots);
705711
}
706712
}
@@ -729,9 +735,12 @@ TEST_CASE("Opts copy ctor, assignment and move testing.")
729735

730736
{
731737
questdb::ingress::opts opts1{
732-
questdb::ingress::protocol::https, "localhost", "9009"};
738+
questdb::ingress::protocol::https, "localhost", "9009", true};
733739
questdb::ingress::opts opts2{
734-
questdb::ingress::protocol::https, "altavista.digital.com", "9009"};
740+
questdb::ingress::protocol::https,
741+
"altavista.digital.com",
742+
"9009",
743+
true};
735744
opts1 = opts2;
736745
}
737746
}
@@ -901,15 +910,15 @@ TEST_CASE("Opts from conf")
901910
TEST_CASE("HTTP basics")
902911
{
903912
questdb::ingress::opts opts1{
904-
questdb::ingress::protocol::http, "localhost", 1};
913+
questdb::ingress::protocol::http, "localhost", 1, true};
905914
questdb::ingress::opts opts1conf = questdb::ingress::opts::from_conf(
906915
"http::addr=localhost:1;username=user;password=pass;request_timeout="
907-
"5000;retry_timeout=5;");
916+
"5000;retry_timeout=5;disable_line_protocol_validation=on;");
908917
questdb::ingress::opts opts2{
909-
questdb::ingress::protocol::https, "localhost", "1"};
918+
questdb::ingress::protocol::https, "localhost", "1", true};
910919
questdb::ingress::opts opts2conf = questdb::ingress::opts::from_conf(
911920
"http::addr=localhost:1;token=token;request_min_throughput=1000;retry_"
912-
"timeout=0;");
921+
"timeout=0;disable_line_protocol_validation=on;");
913922
opts1.username("user")
914923
.password("pass")
915924
.max_buf_size(1000000)

include/questdb/ingress/line_sender.h

+48
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,18 @@ typedef enum line_sender_protocol
9595
line_sender_protocol_https,
9696
} line_sender_protocol;
9797

98+
/** The line protocol version used to write data to buffer. */
99+
typedef enum line_protocol_version
100+
{
101+
/** Version 1 of InfluxDB Line Protocol.
102+
Uses text format serialization for f64. */
103+
line_protocol_version_1,
104+
105+
/** Version 2 of InfluxDB Line Protocol.
106+
Uses binary format serialization for f64, and support array data type.*/
107+
line_protocol_version_2,
108+
} line_protocol_version;
109+
98110
/** Possible sources of the root certificates used to validate the server's
99111
* TLS certificate. */
100112
typedef enum line_sender_ca
@@ -296,6 +308,23 @@ line_sender_buffer* line_sender_buffer_new();
296308
LINESENDER_API
297309
line_sender_buffer* line_sender_buffer_with_max_name_len(size_t max_name_len);
298310

311+
/**
312+
* Sets the Line Protocol version for line_sender_buffer.
313+
*
314+
* The buffer defaults is line_protocol_version_2 which uses
315+
* binary format f64 serialization and support array data type. Call this to
316+
* switch to version 1 (text format f64) when connecting to servers that don't
317+
* support line_protocol_version_2(under 8.3.2).
318+
*
319+
* Must be called before adding any data to the buffer. Protocol version cannot
320+
* be changed after the buffer contains data.
321+
*/
322+
LINESENDER_API
323+
line_sender_buffer* line_sender_buffer_set_line_protocol_version(
324+
line_sender_buffer* buffer,
325+
line_protocol_version version,
326+
line_sender_error** err_out);
327+
299328
/** Release the `line_sender_buffer` object. */
300329
LINESENDER_API
301330
void line_sender_buffer_free(line_sender_buffer* buffer);
@@ -884,6 +913,25 @@ line_sender* line_sender_from_conf(
884913
LINESENDER_API
885914
line_sender* line_sender_from_env(line_sender_error** err_out);
886915

916+
/**
917+
* Returns the client's recommended default line protocol version.
918+
* Will be used to [`line_sender_buffer_set_line_protocol_version`]
919+
*
920+
* The version selection follows these rules:
921+
* 1. TCP/TCPS Protocol: Always returns [`LineProtocolVersion::V2`]
922+
* 2. HTTP/HTTPS Protocol:
923+
* - If line protocol auto-detection is disabled
924+
* [`line_sender_opts_disable_line_protocol_validation`], returns
925+
* [`LineProtocolVersion::V2`]
926+
* - If line protocol auto-detection is enabled:
927+
* - Uses the server's default version if supported by the client
928+
* - Otherwise uses the highest mutually supported version from the
929+
* intersection of client and server compatible versions.
930+
*/
931+
LINESENDER_API
932+
line_protocol_version line_sender_default_line_protocol_version(
933+
line_sender* sender);
934+
887935
/**
888936
* Tell whether the sender is no longer usable and must be closed.
889937
* This happens when there was an earlier failure.

include/questdb/ingress/line_sender.hpp

+77-6
Original file line numberDiff line numberDiff line change
@@ -401,17 +401,30 @@ class line_sender_buffer
401401
{
402402
}
403403

404+
line_sender_buffer(
405+
size_t init_buf_size,
406+
size_t max_name_len,
407+
line_protocol_version version) noexcept
408+
: _impl{nullptr}
409+
, _init_buf_size{init_buf_size}
410+
, _max_name_len{max_name_len}
411+
, _line_protocol_version{version}
412+
{
413+
}
414+
404415
line_sender_buffer(const line_sender_buffer& other) noexcept
405416
: _impl{::line_sender_buffer_clone(other._impl)}
406417
, _init_buf_size{other._init_buf_size}
407418
, _max_name_len{other._max_name_len}
419+
, _line_protocol_version{other._line_protocol_version}
408420
{
409421
}
410422

411423
line_sender_buffer(line_sender_buffer&& other) noexcept
412424
: _impl{other._impl}
413425
, _init_buf_size{other._init_buf_size}
414426
, _max_name_len{other._max_name_len}
427+
, _line_protocol_version{other._line_protocol_version}
415428
{
416429
other._impl = nullptr;
417430
}
@@ -427,6 +440,7 @@ class line_sender_buffer
427440
_impl = nullptr;
428441
_init_buf_size = other._init_buf_size;
429442
_max_name_len = other._max_name_len;
443+
_line_protocol_version = other._line_protocol_version;
430444
}
431445
return *this;
432446
}
@@ -439,11 +453,32 @@ class line_sender_buffer
439453
_impl = other._impl;
440454
_init_buf_size = other._init_buf_size;
441455
_max_name_len = other._max_name_len;
456+
_line_protocol_version = other._line_protocol_version;
442457
other._impl = nullptr;
443458
}
444459
return *this;
445460
}
446461

462+
/**
463+
* Sets the Line Protocol version for line_sender_buffer.
464+
*
465+
* The buffer defaults is line_protocol_version_2 which uses
466+
* binary format f64 serialization and support array data type. Call this to
467+
* switch to version 1 (text format f64) when connecting to servers that
468+
* don't support line_protocol_version_2(under 8.3.2).
469+
*
470+
* Must be called before adding any data to the buffer. Protocol version
471+
* cannot be changed after the buffer contains data.
472+
*/
473+
line_sender_buffer& set_line_protocol_version(line_protocol_version v)
474+
{
475+
may_init();
476+
line_sender_error::wrapped_call(
477+
::line_sender_buffer_set_line_protocol_version, _impl, v);
478+
_line_protocol_version = v;
479+
return *this;
480+
}
481+
447482
/**
448483
* Pre-allocate to ensure the buffer has enough capacity for at least
449484
* the specified additional byte count. This may be rounded up.
@@ -803,12 +838,17 @@ class line_sender_buffer
803838
{
804839
_impl = ::line_sender_buffer_with_max_name_len(_max_name_len);
805840
::line_sender_buffer_reserve(_impl, _init_buf_size);
841+
line_sender_error::wrapped_call(
842+
line_sender_buffer_set_line_protocol_version,
843+
_impl,
844+
_line_protocol_version);
806845
}
807846
}
808847

809848
::line_sender_buffer* _impl;
810849
size_t _init_buf_size;
811850
size_t _max_name_len;
851+
line_protocol_version _line_protocol_version{::line_protocol_version_2};
812852

813853
friend class line_sender;
814854
};
@@ -868,13 +908,24 @@ class opts
868908
* @param[in] protocol The protocol to use.
869909
* @param[in] host The QuestDB database host.
870910
* @param[in] port The QuestDB tcp or http port.
911+
* @param[in] disable_line_protocol_validation disable line protocol version
912+
* validation.
871913
*/
872-
opts(protocol protocol, utf8_view host, uint16_t port) noexcept
914+
opts(
915+
protocol protocol,
916+
utf8_view host,
917+
uint16_t port,
918+
bool disable_line_protocol_validation = false) noexcept
873919
: _impl{::line_sender_opts_new(
874920
static_cast<::line_sender_protocol>(protocol), host._impl, port)}
875921
{
876922
line_sender_error::wrapped_call(
877923
::line_sender_opts_user_agent, _impl, _user_agent::name());
924+
if (disable_line_protocol_validation)
925+
{
926+
line_sender_error::wrapped_call(
927+
::line_sender_opts_disable_line_protocol_validation, _impl);
928+
}
878929
}
879930

880931
/**
@@ -883,15 +934,25 @@ class opts
883934
* @param[in] protocol The protocol to use.
884935
* @param[in] host The QuestDB database host.
885936
* @param[in] port The QuestDB tcp or http port as service name.
937+
* @param[in] disable_line_protocol_validation disable line protocol version
886938
*/
887-
opts(protocol protocol, utf8_view host, utf8_view port) noexcept
939+
opts(
940+
protocol protocol,
941+
utf8_view host,
942+
utf8_view port,
943+
bool disable_line_protocol_validation = false) noexcept
888944
: _impl{::line_sender_opts_new_service(
889945
static_cast<::line_sender_protocol>(protocol),
890946
host._impl,
891947
port._impl)}
892948
{
893949
line_sender_error::wrapped_call(
894950
::line_sender_opts_user_agent, _impl, _user_agent::name());
951+
if (disable_line_protocol_validation)
952+
{
953+
line_sender_error::wrapped_call(
954+
::line_sender_opts_disable_line_protocol_validation, _impl);
955+
}
895956
}
896957

897958
opts(const opts& other) noexcept
@@ -1194,13 +1255,23 @@ class line_sender
11941255
return {opts::from_env()};
11951256
}
11961257

1197-
line_sender(protocol protocol, utf8_view host, uint16_t port)
1198-
: line_sender{opts{protocol, host, port}}
1258+
line_sender(
1259+
protocol protocol,
1260+
utf8_view host,
1261+
uint16_t port,
1262+
bool disable_line_protocol_validation = false)
1263+
: line_sender{
1264+
opts{protocol, host, port, disable_line_protocol_validation}}
11991265
{
12001266
}
12011267

1202-
line_sender(protocol protocol, utf8_view host, utf8_view port)
1203-
: line_sender{opts{protocol, host, port}}
1268+
line_sender(
1269+
protocol protocol,
1270+
utf8_view host,
1271+
utf8_view port,
1272+
bool disable_line_protocol_validation = false)
1273+
: line_sender{
1274+
opts{protocol, host, port, disable_line_protocol_validation}}
12041275
{
12051276
}
12061277

0 commit comments

Comments
 (0)