Skip to content

getdozer/clickhouse.rs

 
 

Repository files navigation

clickhouse.rs

A typed client for ClickHouse.

Crates.io Documentation MIT licensed Build Status

  • Uses serde for encoding/decoding rows.
  • Supports serde attributes: skip_serializing, skip_deserializing, rename.
  • Uses RowBinary encoding over HTTP transport.
    • There are plans to switch to Native over TCP.
  • Supports TLS.
  • Supports compression and decompression (LZ4 and LZ4HC).
  • Provides API for selecting.
  • Provides API for inserting.
  • Provides API for infinite transactional (see below) inserting.
  • Provides API for watching live views.
  • Provides mocks for unit testing.

Note: ch2rs is useful to generate a row type from ClickHouse.

Usage

To use the crate, add this to your Cargo.toml:

[dependencies]
clickhouse = "0.11.6"

[dev-dependencies]
clickhouse = { version = "0.11.6", features = ["test-util"] }

Note about ClickHouse prior to v22.6

CH server older than v22.6 (2022-06-16) handles RowBinary incorrectly in some rare cases. Use 0.11 and enable wa-37420 feature to solve this problem. Don't use it for newer versions.

Create a client

use clickhouse::Client;

let client = Client::default()
    .with_url("http://localhost:8123")
    .with_user("name")
    .with_password("123")
    .with_database("test");
  • Reuse created clients or clone them in order to reuse a connection pool.

Select rows

use serde::Deserialize;
use clickhouse::Row;

#[derive(Row, Deserialize)]
struct MyRow<'a> {
    no: u32,
    name: &'a str,
}

let mut cursor = client
    .query("SELECT ?fields FROM some WHERE no BETWEEN ? AND ?")
    .bind(500)
    .bind(504)
    .fetch::<MyRow<'_>>()?;

while let Some(row) = cursor.next().await? { .. }
  • Placeholder ?fields is replaced with no, name (fields of Row).
  • Placeholder ? is replaced with values in following bind() calls.
  • Convenient fetch_one::<Row>() and fetch_all::<Row>() can be used to get a first row or all rows correspondingly.
  • sql::Identifier can be used to bind table names.

Note that cursors can return an error even after producing some rows. To avoid this, use client.with_option("wait_end_of_query", "1") in order to enable buffering on the server-side. More details. The buffer_size option can be useful too.

Insert a batch

use serde::Serialize;
use clickhouse::Row;

#[derive(Row, Serialize)]
struct MyRow {
    no: u32,
    name: String,
}

let mut insert = client.insert("some")?;
insert.write(&MyRow { no: 0, name: "foo".into() }).await?;
insert.write(&MyRow { no: 1, name: "bar".into() }).await?;
insert.end().await?;
  • If end() isn't called, the INSERT is aborted.
  • Rows are being sent progressively to spread network load.
  • ClickHouse inserts batches atomically only if all rows fit in the same partition and their number is less max_insert_block_size.

Infinite inserting

let mut inserter = client.inserter("some")?
    .with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(20)))
    .with_max_bytes(50_000_000)
    .with_max_rows(750_000)
    .with_period(Some(Duration::from_secs(15)));

inserter.write(&MyRow { no: 0, name: "foo".into() })?;
inserter.write(&MyRow { no: 1, name: "bar".into() })?;
let stats = inserter.commit().await?;
if stats.rows > 0 {
    println!(
        "{} bytes, {} rows, {} transactions have been inserted",
        stats.bytes, stats.rows, stats.transactions,
    );
}
  • Inserter ends an active insert in commit() if thresholds (max_bytes, max_rows, period) are reached.
  • The interval between ending active INSERTs can be biased by using with_period_bias to avoid load spikes by parallel inserters.
  • Inserter::time_left() can be used to detect when the current period ends. Call Inserter::commit() again to check limits.
  • All rows between commit() calls are inserted in the same INSERT statement.
  • Do not forget to flush if you want to terminate inserting:
inserter.end().await?;

Perform DDL

client.query("DROP TABLE IF EXISTS some").execute().await?;

Live views

Requires the watch feature.

let mut cursor = client
    .watch("SELECT max(no), argMax(name, no) FROM some")
    .fetch::<Row<'_>>()?;

let (version, row) = cursor.next().await?.unwrap();
println!("live view updated: version={}, row={:?}", version, row);

// Use `only_events()` to iterate over versions only.
let mut cursor = client.watch("some_live_view").limit(20).only_events().fetch()?;
println!("live view updated: version={:?}", cursor.next().await?);
  • Use carefully.
  • This code uses or creates if not exists a temporary live view named lv_{sha1(query)} to reuse the same live view by parallel watchers.
  • You can specify a name instead of a query.
  • This API uses JSONEachRowWithProgress under the hood because of the issue.
  • Only struct rows can be used. Avoid fetch::<u64>() and other without specified names.

See examples.

Feature Flags

  • lz4 (enabled by default) — enables Compression::Lz4 and Compression::Lz4Hc(_) variants. If enabled, Compression::Lz4 is used by default for all queries except for WATCH.
  • tls (enabled by default) — supports urls with the HTTPS schema.
  • quanta (enabled by default) - uses the quanta crate to speed the inserter up. Not used if test-util is enabled (thus, time can be managed by tokio::time::advance() in custom tests).
  • test-util — adds mocks. See the example. Use it only in dev-dependencies.
  • watch — enables client.watch functionality. See the corresponding section for details.
  • uuid — adds serde::uuid to work with uuid crate.
  • time — adds serde::time to work with time crate.

Data Types

  • (U)Int(8|16|32|64|128) maps to/from corresponding (u|i)(8|16|32|64|128) types or newtypes around them.

  • (U)Int256 aren't supported directly, but there is a workaround for it.

  • Float(32|64) maps to/from corresponding f(32|64) or newtypes around them.

  • Decimal(32|64|128) maps to/from corresponding i(32|64|128) or newtypes around them. It's more convenient to use fixnum or another implementation of signed fixed-point numbers.

  • Boolean maps to/from bool or newtypes around it.

  • String maps to/from any string or bytes types, e.g. &str, &[u8], String, Vec<u8> or SmartString. Newtypes are also supported. To store bytes, consider using serde_bytes, because it's more efficient.

    Example
    #[derive(Row, Debug, Serialize, Deserialize)]
    struct MyRow<'a> {
        str: &'a str,
        string: String,
        #[serde(with = "serde_bytes")]
        bytes: Vec<u8>,
        #[serde(with = "serde_bytes")]
        byte_slice: &'a [u8],
    }
  • FixedString(_) isn't supported yet.

  • Enum(8|16) are supported using serde_repr.

    Example
    use serde_repr::{Deserialize_repr, Serialize_repr};
    
    #[derive(Row, Serialize, Deserialize)]
    struct MyRow {
        level: Level,
    }
    
    #[derive(Debug, Serialize_repr, Deserialize_repr)]
    #[repr(u8)]
    enum Level {
        Debug = 1,
        Info = 2,
        Warn = 3,
        Error = 4,
    }
  • UUID maps to/from uuid::Uuid by using serde::uuid. Requires the uuid feature.

    Example
    #[derive(Row, Serialize, Deserialize)]
    struct MyRow {
        #[serde(with = "clickhouse::serde::uuid")]
        uuid: uuid::Uuid,
    }
  • IPv6 maps to/from std::net::Ipv6Addr.

  • IPv4 maps to/from std::net::Ipv4Addr by using serde::ipv4.

    Example
    #[derive(Row, Serialize, Deserialize)]
    struct MyRow {
        #[serde(with = "clickhouse::serde::ipv4")]
        ipv4: std::net::Ipv4Addr,
    }
  • Date maps to/from u16 or a newtype around it and represents a number of days elapsed since 1970-01-01. Also, time::Date is supported by using serde::time::date, that requires the time feature.

    Example
    #[derive(Row, Serialize, Deserialize)]
    struct MyRow {
        days: u16,
        #[serde(with = "clickhouse::serde::time::date")]
        date: Date,
    }
  • Date32 maps to/from i32 or a newtype around it and represents a number of days elapsed since 1970-01-01. Also, time::Date is supported by using serde::time::date32, that requires the time feature.

    Example
    #[derive(Row, Serialize, Deserialize)]
    struct MyRow {
        days: i32,
        #[serde(with = "clickhouse::serde::time::date32")]
        date: Date,
    }
  • DateTime maps to/from u32 or a newtype around it and represents a number of seconds elapsed since UNIX epoch. Also, time::OffsetDateTime is supported by using serde::time::datetime, that requires the time feature.

    Example
    #[derive(Row, Serialize, Deserialize)]
    struct MyRow {
        ts: u32,
        #[serde(with = "clickhouse::serde::time::datetime")]
        dt: OffsetDateTime,
    }
  • DateTime64(_) maps to/from i32 or a newtype around it and represents a time elapsed since UNIX epoch. Also, time::OffsetDateTime is supported by using serde::time::datetime64::*, that requires the time feature.

    Example
    #[derive(Row, Serialize, Deserialize)]
    struct MyRow {
        ts: i64, // elapsed s/us/ms/ns depending on `DateTime64(X)`
        #[serde(with = "clickhouse::serde::time::datetime64::secs")]
        dt64s: OffsetDateTime,  // `DateTime64(0)`
        #[serde(with = "clickhouse::serde::time::datetime64::millis")]
        dt64ms: OffsetDateTime, // `DateTime64(3)`
        #[serde(with = "clickhouse::serde::time::datetime64::micros")]
        dt64us: OffsetDateTime, // `DateTime64(6)`
        #[serde(with = "clickhouse::serde::time::datetime64::nanos")]
        dt64ns: OffsetDateTime, // `DateTime64(9)`
    }
  • Typle(A, B, ...) maps to/from (A, B, ...) or a newtype around it.

  • Array(_) maps to/from any slice, e.g. Vec<_>, &[_]. Newtypes are also supported.

  • Map(K, V) behaves like Array((K, V)).

  • LowCardinality(_) is supported seamlessly.

  • Nullable(_) maps to/from Option<_>. For clickhouse::serde::* helpers add ::option.

    Example
    #[derive(Row, Serialize, Deserialize)]
    struct MyRow {
        #[serde(with = "clickhouse::serde::ipv4::option")]
        ipv4_opt: Option<Ipv4Addr>,
    }
  • Nested is supported by providing multiple arrays with renaming.

    Example
    // CREATE TABLE test(items Nested(name String, count UInt32))
    #[derive(Row, Serialize, Deserialize)]
    struct MyRow {
        #[serde(rename = "items.name")]
        items_name: Vec<String>,
        #[serde(rename = "items.count")]
        items_count: Vec<u32>,
    }
  • JSON and Geo aren't supported for now.

Mocking

The crate provides utils for mocking CH server and testing DDL, SELECT, INSERT and WATCH queries.

The functionality can be enabled with the test-util feature. Use it only in dev-dependencies.

See the example.

About

A typed client for ClickHouse

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Rust 100.0%