diff --git a/Cargo.lock b/Cargo.lock index 7a5dabdef624..288bf187489c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4497,6 +4497,7 @@ dependencies = [ "async-trait", "byteorder", "bytes", + "chrono", "common-base", "common-config", "common-error", @@ -4508,6 +4509,7 @@ dependencies = [ "dashmap", "futures", "futures-util", + "itertools 0.10.5", "protobuf", "protobuf-build", "raft-engine", diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 2110bd832e54..627fb9cccaa8 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -51,7 +51,8 @@ sync_write = false # Kafka wal options, see `standalone.example.toml`. # broker_endpoints = ["127.0.0.1:9092"] -# max_batch_size = "4MB" +# Warning: Kafka has a default limit of 1MB per message in a topic. +# max_batch_size = "1MB" # linger = "200ms" # consumer_wait_timeout = "100ms" # backoff_init = "500ms" diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 486b2e8fd9d7..24412443cb75 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -108,7 +108,8 @@ provider = "raft_engine" # replication_factor = 1 # The max size of a single producer batch. -# max_batch_size = "4MB" +# Warning: Kafka has a default limit of 1MB per message in a topic. +# max_batch_size = "1MB" # The linger duration. # linger = "200ms" # The consumer wait timeout. diff --git a/src/common/config/src/wal.rs b/src/common/config/src/wal.rs index 6f0a9867804c..0b951a942662 100644 --- a/src/common/config/src/wal.rs +++ b/src/common/config/src/wal.rs @@ -90,9 +90,10 @@ mod tests { #[test] fn test_serde_kafka_config() { + // With all fields. let toml_str = r#" broker_endpoints = ["127.0.0.1:9092"] - max_batch_size = "4MB" + max_batch_size = "1MB" linger = "200ms" consumer_wait_timeout = "100ms" backoff_init = "500ms" @@ -104,7 +105,7 @@ mod tests { let expected = KafkaConfig { broker_endpoints: vec!["127.0.0.1:9092".to_string()], compression: RsKafkaCompression::default(), - max_batch_size: ReadableSize::mb(4), + max_batch_size: ReadableSize::mb(1), linger: Duration::from_millis(200), consumer_wait_timeout: Duration::from_millis(100), backoff: KafkaBackoffConfig { @@ -115,6 +116,19 @@ mod tests { }, }; assert_eq!(decoded, expected); + + // With some fields missing. + let toml_str = r#" + broker_endpoints = ["127.0.0.1:9092"] + linger = "200ms" + "#; + let decoded: KafkaConfig = toml::from_str(toml_str).unwrap(); + let expected = KafkaConfig { + broker_endpoints: vec!["127.0.0.1:9092".to_string()], + linger: Duration::from_millis(200), + ..Default::default() + }; + assert_eq!(decoded, expected); } #[test] diff --git a/src/common/config/src/wal/kafka.rs b/src/common/config/src/wal/kafka.rs index d510e973451c..884e3f3f0a99 100644 --- a/src/common/config/src/wal/kafka.rs +++ b/src/common/config/src/wal/kafka.rs @@ -60,7 +60,8 @@ impl Default for KafkaConfig { Self { broker_endpoints: vec!["127.0.0.1:9092".to_string()], compression: RsKafkaCompression::NoCompression, - max_batch_size: ReadableSize::mb(4), + // Warning: Kafka has a default limit of 1MB per message in a topic. + max_batch_size: ReadableSize::mb(1), linger: Duration::from_millis(200), consumer_wait_timeout: Duration::from_millis(100), backoff: KafkaBackoffConfig::default(), diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index 7941cd11e918..dd85e2933c22 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -14,6 +14,7 @@ async-stream.workspace = true async-trait.workspace = true byteorder = "1.4" bytes.workspace = true +chrono.workspace = true common-base.workspace = true common-config.workspace = true common-error.workspace = true @@ -24,6 +25,7 @@ common-telemetry.workspace = true dashmap.workspace = true futures-util.workspace = true futures.workspace = true +itertools.workspace = true protobuf = { version = "2", features = ["bytes"] } raft-engine.workspace = true rskafka.workspace = true diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index 7f475e2076a8..b3f8b5d08585 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -18,6 +18,7 @@ use common_config::wal::KafkaWalTopic; use common_error::ext::ErrorExt; use common_macro::stack_trace_debug; use common_runtime::error::Error as RuntimeError; +use serde_json::error::Error as JsonError; use snafu::{Location, Snafu}; use crate::kafka::NamespaceImpl as KafkaNamespace; @@ -123,20 +124,6 @@ pub enum Error { error: String, }, - #[snafu(display("Failed to encode a record meta"))] - EncodeMeta { - location: Location, - #[snafu(source)] - error: serde_json::Error, - }, - - #[snafu(display("Failed to decode a record meta"))] - DecodeMeta { - location: Location, - #[snafu(source)] - error: serde_json::Error, - }, - #[snafu(display("Missing required key in a record"))] MissingKey { location: Location }, @@ -146,9 +133,16 @@ pub enum Error { #[snafu(display("Cannot build a record from empty entries"))] EmptyEntries { location: Location }, - #[snafu(display("Failed to produce records to Kafka, topic: {}", topic))] + #[snafu(display( + "Failed to produce records to Kafka, topic: {}, size: {}, limit: {}", + topic, + size, + limit, + ))] ProduceRecord { topic: KafkaWalTopic, + size: usize, + limit: usize, location: Location, #[snafu(source)] error: rskafka::client::producer::Error, @@ -172,6 +166,23 @@ pub enum Error { #[snafu(display("Failed to do a cast"))] Cast { location: Location }, + + #[snafu(display("Failed to encode object into json"))] + EncodeJson { + location: Location, + #[snafu(source)] + error: JsonError, + }, + + #[snafu(display("Failed to decode object from json"))] + DecodeJson { + location: Location, + #[snafu(source)] + error: JsonError, + }, + + #[snafu(display("The record sequence is not legal, error: {}", error))] + IllegalSequence { location: Location, error: String }, } impl ErrorExt for Error { diff --git a/src/log-store/src/kafka.rs b/src/log-store/src/kafka.rs index fefa7823c5c2..ab8556572e8b 100644 --- a/src/log-store/src/kafka.rs +++ b/src/log-store/src/kafka.rs @@ -12,10 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod client_manager; +pub(crate) mod client_manager; pub mod log_store; -mod offset; -mod record_utils; +pub(crate) mod util; use std::fmt::Display; @@ -29,8 +28,8 @@ use crate::error::Error; /// Kafka Namespace implementation. #[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)] pub struct NamespaceImpl { - region_id: u64, - topic: Topic, + pub region_id: u64, + pub topic: Topic, } impl Namespace for NamespaceImpl { diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index cd2f705c4db9..19ae75deffcb 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -62,7 +62,7 @@ impl Client { /// Manages client construction and accesses. #[derive(Debug)] pub(crate) struct ClientManager { - config: KafkaConfig, + pub(crate) config: KafkaConfig, /// Top-level client in kafka. All clients are constructed by this client. client_factory: RsKafkaClient, /// A pool maintaining a collection of clients. diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 2e5543341506..ddac8c6b752f 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -26,10 +26,10 @@ use store_api::logstore::entry_stream::SendableEntryStream; use store_api::logstore::namespace::Id as NamespaceId; use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore}; -use crate::error::{ConsumeRecordSnafu, Error, GetOffsetSnafu, Result}; +use crate::error::{ConsumeRecordSnafu, Error, GetOffsetSnafu, IllegalSequenceSnafu, Result}; use crate::kafka::client_manager::{ClientManager, ClientManagerRef}; -use crate::kafka::offset::Offset; -use crate::kafka::record_utils::{decode_from_record, RecordProducer}; +use crate::kafka::util::offset::Offset; +use crate::kafka::util::record::{maybe_emit_entry, Record, RecordProducer}; use crate::kafka::{EntryImpl, NamespaceImpl}; /// A log store backed by Kafka. @@ -85,8 +85,6 @@ impl LogStore for KafkaLogStore { /// Appends a batch of entries and returns a response containing a map where the key is a region id /// while the value is the id of the last successfully written entry of the region. async fn append_batch(&self, entries: Vec) -> Result { - debug!("LogStore handles append_batch with entries {:?}", entries); - if entries.is_empty() { return Ok(AppendBatchResponse::default()); } @@ -96,7 +94,7 @@ impl LogStore for KafkaLogStore { for entry in entries { producers .entry(entry.ns.region_id) - .or_insert(RecordProducer::new(entry.ns.clone())) + .or_insert_with(|| RecordProducer::new(entry.ns.clone())) .push(entry); } @@ -115,8 +113,6 @@ impl LogStore for KafkaLogStore { .into_iter() .collect::>(); - debug!("Append batch result: {:?}", last_entry_ids); - Ok(AppendBatchResponse { last_entry_ids }) } @@ -127,13 +123,10 @@ impl LogStore for KafkaLogStore { ns: &Self::Namespace, entry_id: EntryId, ) -> Result> { - let topic = ns.topic.clone(); - let region_id = ns.region_id; - // Gets the client associated with the topic. let client = self .client_manager - .get_or_insert(&topic) + .get_or_insert(&ns.topic) .await? .raw_client .clone(); @@ -148,13 +141,13 @@ impl LogStore for KafkaLogStore { .context(GetOffsetSnafu { ns: ns.clone() })? - 1; // Reads entries with offsets in the range [start_offset, end_offset). - let start_offset = Offset::try_from(entry_id)?.0; + let start_offset: i64 = Offset::try_from(entry_id)?.0; // Abort if there're no new entries. // FIXME(niebayes): how come this case happens? if start_offset > end_offset { warn!( - "No new entries for ns {} in range [{}, {})", + "No new entries for ns {} in range [{}, {}]", ns, start_offset, end_offset ); return Ok(futures_util::stream::empty().boxed()); @@ -166,44 +159,52 @@ impl LogStore for KafkaLogStore { .build(); debug!( - "Built a stream consumer for ns {} to consume entries in range [{}, {})", + "Built a stream consumer for ns {} to consume entries in range [{}, {}]", ns, start_offset, end_offset ); + // Key: entry id, Value: the records associated with the entry. + let mut entry_records: HashMap<_, Vec<_>> = HashMap::new(); let ns_clone = ns.clone(); let stream = async_stream::stream!({ while let Some(consume_result) = stream_consumer.next().await { - // Each next will prdoce a `RecordAndOffset` and a high watermark offset. + // Each next on the stream consumer produces a `RecordAndOffset` and a high watermark offset. // The `RecordAndOffset` contains the record data and its start offset. - // The high watermark offset is the end offset of the latest record in the partition. - let (record, high_watermark) = consume_result.context(ConsumeRecordSnafu { - ns: ns_clone.clone(), - })?; - let record_offset = record.offset; + // The high watermark offset is the offset of the last record plus one. + let (record_and_offset, high_watermark) = + consume_result.with_context(|_| ConsumeRecordSnafu { + ns: ns_clone.clone(), + })?; + let (kafka_record, offset) = (record_and_offset.record, record_and_offset.offset); + debug!( "Read a record at offset {} for ns {}, high watermark: {}", - record_offset, ns_clone, high_watermark + offset, ns_clone, high_watermark ); - // Ignores noop records. - if record.record.value.is_none() { + // Ignores no-op records. + if kafka_record.value.is_none() { + if check_termination(offset, end_offset, &entry_records)? { + break; + } + continue; + } + + // Filters records by namespace. + let record = Record::try_from(kafka_record)?; + if record.meta.ns != ns_clone { + if check_termination(offset, end_offset, &entry_records)? { + break; + } continue; } - let entries = decode_from_record(record.record)?; - // Filters entries by region id. - if let Some(entry) = entries.first() - && entry.ns.region_id == region_id - { - yield Ok(entries); + // Tries to construct an entry from records consumed so far. + if let Some(entry) = maybe_emit_entry(record, &mut entry_records)? { + yield Ok(vec![entry]); } - // Terminates the stream if the entry with the end offset was read. - if record_offset >= end_offset { - debug!( - "Stream consumer for ns {} terminates at offset {}", - ns_clone, record_offset - ); + if check_termination(offset, end_offset, &entry_records)? { break; } } @@ -252,3 +253,24 @@ impl LogStore for KafkaLogStore { Ok(()) } } + +fn check_termination( + offset: i64, + end_offset: i64, + entry_records: &HashMap>, +) -> Result { + // Terminates the stream if the entry with the end offset was read. + if offset >= end_offset { + debug!("Stream consumer terminates at offset {}", offset); + // There must have no records when the stream terminates. + if !entry_records.is_empty() { + return IllegalSequenceSnafu { + error: "Found records leftover", + } + .fail(); + } + Ok(true) + } else { + Ok(false) + } +} diff --git a/src/log-store/src/kafka/record_utils.rs b/src/log-store/src/kafka/record_utils.rs deleted file mode 100644 index 3707b873f3e3..000000000000 --- a/src/log-store/src/kafka/record_utils.rs +++ /dev/null @@ -1,188 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use rskafka::record::Record; -use serde::{Deserialize, Serialize}; -use snafu::{ensure, OptionExt, ResultExt}; - -use crate::error::{ - DecodeMetaSnafu, EmptyEntriesSnafu, EncodeMetaSnafu, GetClientSnafu, MissingKeySnafu, - MissingValueSnafu, ProduceRecordSnafu, Result, -}; -use crate::kafka::client_manager::ClientManagerRef; -use crate::kafka::offset::Offset; -use crate::kafka::{EntryId, EntryImpl, NamespaceImpl}; - -/// Record metadata which will be serialized/deserialized to/from the `key` of a Record. -#[derive(Debug, Serialize, Deserialize, PartialEq)] -struct RecordMeta { - /// Meta version. Used for backward compatibility. - version: u32, - /// The namespace of the entries wrapped in the record. - ns: NamespaceImpl, - /// Ids of the entries built into the record. - entry_ids: Vec, - /// entry_offsets[i] is the end offset (exclusive) of the data of the i-th entry in the record value. - entry_offsets: Vec, -} - -impl RecordMeta { - fn new(ns: NamespaceImpl, entries: &[EntryImpl]) -> Self { - Self { - version: 0, - ns, - entry_ids: entries.iter().map(|entry| entry.id).collect(), - entry_offsets: entries - .iter() - .map(|entry| entry.data.len()) - .scan(0, |presum, x| { - *presum += x; - Some(*presum) - }) - .collect(), - } - } -} - -/// Produces a record to a kafka topic. -pub(crate) struct RecordProducer { - /// The namespace of the entries. - ns: NamespaceImpl, - /// Entries are buffered before being built into a record. - entries: Vec, -} - -impl RecordProducer { - /// Creates a new producer for producing entries with the given namespace. - pub(crate) fn new(ns: NamespaceImpl) -> Self { - Self { - ns, - entries: Vec::new(), - } - } - - /// Populates the entry buffer with the given entries. - pub(crate) fn with_entries(self, entries: Vec) -> Self { - Self { entries, ..self } - } - - /// Pushes an entry into the entry buffer. - pub(crate) fn push(&mut self, entry: EntryImpl) { - self.entries.push(entry); - } - - /// Produces the buffered entries to kafka sever as a kafka record. - /// Returns the kafka offset of the produced record. - // TODO(niebayes): since the total size of a region's entries may be way-too large, - // the producer may need to support splitting entries into multiple records. - pub(crate) async fn produce(self, client_manager: &ClientManagerRef) -> Result { - ensure!(!self.entries.is_empty(), EmptyEntriesSnafu); - - // Produces the record through a client. The client determines when to send the record to kafka server. - let client = client_manager - .get_or_insert(&self.ns.topic) - .await - .map_err(|e| { - GetClientSnafu { - topic: &self.ns.topic, - error: e.to_string(), - } - .build() - })?; - client - .producer - .produce(encode_to_record(self.ns.clone(), self.entries)?) - .await - .map(Offset) - .context(ProduceRecordSnafu { - topic: &self.ns.topic, - }) - } -} - -fn encode_to_record(ns: NamespaceImpl, entries: Vec) -> Result { - let meta = RecordMeta::new(ns, &entries); - let data = entries.into_iter().flat_map(|entry| entry.data).collect(); - Ok(Record { - key: Some(serde_json::to_vec(&meta).context(EncodeMetaSnafu)?), - value: Some(data), - timestamp: rskafka::chrono::Utc::now(), - headers: Default::default(), - }) -} - -pub(crate) fn decode_from_record(record: Record) -> Result> { - let key = record.key.context(MissingKeySnafu)?; - let value = record.value.context(MissingValueSnafu)?; - let meta: RecordMeta = serde_json::from_slice(&key).context(DecodeMetaSnafu)?; - - let mut entries = Vec::with_capacity(meta.entry_ids.len()); - let mut start_offset = 0; - for (i, end_offset) in meta.entry_offsets.iter().enumerate() { - entries.push(EntryImpl { - // TODO(niebayes): try to avoid the clone. - data: value[start_offset..*end_offset].to_vec(), - id: meta.entry_ids[i], - ns: meta.ns.clone(), - }); - start_offset = *end_offset; - } - Ok(entries) -} - -#[cfg(test)] -mod tests { - use super::*; - - fn new_test_entry>(data: D, entry_id: EntryId, ns: NamespaceImpl) -> EntryImpl { - EntryImpl { - data: data.as_ref().to_vec(), - id: entry_id, - ns, - } - } - - #[test] - fn test_serde_record_meta() { - let ns = NamespaceImpl { - region_id: 1, - topic: "test_topic".to_string(), - }; - let entries = vec![ - new_test_entry(b"111", 1, ns.clone()), - new_test_entry(b"2222", 2, ns.clone()), - new_test_entry(b"33333", 3, ns.clone()), - ]; - let meta = RecordMeta::new(ns, &entries); - let encoded = serde_json::to_vec(&meta).unwrap(); - let decoded: RecordMeta = serde_json::from_slice(&encoded).unwrap(); - assert_eq!(meta, decoded); - } - - #[test] - fn test_encdec_record() { - let ns = NamespaceImpl { - region_id: 1, - topic: "test_topic".to_string(), - }; - let entries = vec![ - new_test_entry(b"111", 1, ns.clone()), - new_test_entry(b"2222", 2, ns.clone()), - new_test_entry(b"33333", 3, ns.clone()), - ]; - let record = encode_to_record(ns, entries.clone()).unwrap(); - let decoded_entries = decode_from_record(record).unwrap(); - assert_eq!(entries, decoded_entries); - } -} diff --git a/src/log-store/src/kafka/util.rs b/src/log-store/src/kafka/util.rs new file mode 100644 index 000000000000..61059b16451f --- /dev/null +++ b/src/log-store/src/kafka/util.rs @@ -0,0 +1,16 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod offset; +pub mod record; diff --git a/src/log-store/src/kafka/offset.rs b/src/log-store/src/kafka/util/offset.rs similarity index 100% rename from src/log-store/src/kafka/offset.rs rename to src/log-store/src/kafka/util/offset.rs diff --git a/src/log-store/src/kafka/util/record.rs b/src/log-store/src/kafka/util/record.rs new file mode 100644 index 000000000000..7d45165514a7 --- /dev/null +++ b/src/log-store/src/kafka/util/record.rs @@ -0,0 +1,564 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use rskafka::record::Record as KafkaRecord; +use serde::{Deserialize, Serialize}; +use snafu::{ensure, OptionExt, ResultExt}; + +use crate::error::{ + DecodeJsonSnafu, EmptyEntriesSnafu, EncodeJsonSnafu, GetClientSnafu, IllegalSequenceSnafu, + MissingKeySnafu, MissingValueSnafu, ProduceRecordSnafu, Result, +}; +use crate::kafka::client_manager::ClientManagerRef; +use crate::kafka::util::offset::Offset; +use crate::kafka::{EntryId, EntryImpl, NamespaceImpl}; + +/// The current version of Record. +pub(crate) const VERSION: u32 = 0; + +/// The estimated size in bytes of a serialized RecordMeta. +/// A record is guaranteed to have sizeof(meta) + sizeof(data) <= max_batch_size - ESTIMATED_META_SIZE. +const ESTIMATED_META_SIZE: usize = 256; + +/// The type of a record. +/// +/// - If the entry is able to fit into a Kafka record, it's converted into a Full record. +/// +/// - If the entry is too large to fit into a Kafka record, it's converted into a collection of records. +/// Those records must contain exactly one First record and one Last record, and potentially several +/// Middle records. There may be no Middle record. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)] +pub enum RecordType { + /// The record is self-contained, i.e. an entry's data is fully stored into this record. + Full, + /// The record contains the first part of an entry's data. + First, + /// The record contains one of the middle parts of an entry's data. + /// The sequence of the record is identified by the inner field. + Middle(usize), + /// The record contains the last part of an entry's data. + Last, +} + +/// The metadata of a record. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct RecordMeta { + /// The version of the record. Used for backward compatibility. + version: u32, + /// The type of the record. + pub tp: RecordType, + /// The id of the entry the record associated with. + pub entry_id: EntryId, + /// The namespace of the entry the record associated with. + pub ns: NamespaceImpl, +} + +/// The minimal storage unit in the Kafka log store. +/// +/// An entry will be first converted into several Records before producing. +/// If an entry is able to fit into a KafkaRecord, it converts to a single Record. +/// If otherwise an entry cannot fit into a KafkaRecord, it will be split into a collection of Records. +/// +/// A KafkaRecord is the minimal storage unit used by Kafka client and Kafka server. +/// The Kafka client produces KafkaRecords and consumes KafkaRecords, and Kafka server stores +/// a collection of KafkaRecords. +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct Record { + /// The metadata of the record. + pub(crate) meta: RecordMeta, + /// The payload of the record. + data: Vec, +} + +impl TryFrom for KafkaRecord { + type Error = crate::error::Error; + + fn try_from(record: Record) -> Result { + let key = serde_json::to_vec(&record.meta).context(EncodeJsonSnafu)?; + Ok(KafkaRecord { + key: Some(key), + value: Some(record.data), + timestamp: chrono::Utc::now(), + headers: Default::default(), + }) + } +} + +impl TryFrom for Record { + type Error = crate::error::Error; + + fn try_from(kafka_record: KafkaRecord) -> Result { + let key = kafka_record.key.context(MissingKeySnafu)?; + let meta = serde_json::from_slice(&key).context(DecodeJsonSnafu)?; + let data = kafka_record.value.context(MissingValueSnafu)?; + Ok(Self { meta, data }) + } +} + +impl From> for EntryImpl { + fn from(records: Vec) -> Self { + let entry_id = records[0].meta.entry_id; + let ns = records[0].meta.ns.clone(); + let data = records.into_iter().flat_map(|record| record.data).collect(); + EntryImpl { + data, + id: entry_id, + ns, + } + } +} + +/// Produces a record to a kafka topic. +pub(crate) struct RecordProducer { + /// The namespace of the entries. + ns: NamespaceImpl, + /// Entries are buffered before being built into a record. + entries: Vec, +} + +impl RecordProducer { + /// Creates a new producer for producing entries with the given namespace. + pub(crate) fn new(ns: NamespaceImpl) -> Self { + Self { + ns, + entries: Vec::new(), + } + } + + /// Populates the entry buffer with the given entries. + pub(crate) fn with_entries(self, entries: Vec) -> Self { + Self { entries, ..self } + } + + /// Pushes an entry into the entry buffer. + pub(crate) fn push(&mut self, entry: EntryImpl) { + self.entries.push(entry); + } + + /// Produces the buffered entries to Kafka sever. Those entries may span several Kafka records. + pub(crate) async fn produce(self, client_manager: &ClientManagerRef) -> Result { + ensure!(!self.entries.is_empty(), EmptyEntriesSnafu); + + // Gets the producer in which a record buffer is maintained. + let producer = client_manager + .get_or_insert(&self.ns.topic) + .await + .map_err(|e| { + GetClientSnafu { + topic: &self.ns.topic, + error: e.to_string(), + } + .build() + })? + .producer; + + // Stores the offset of the last successfully produced record. + let mut last_offset = None; + let max_record_size = + client_manager.config.max_batch_size.as_bytes() as usize - ESTIMATED_META_SIZE; + for entry in self.entries { + for record in build_records(entry, max_record_size) { + let kafka_record = KafkaRecord::try_from(record)?; + // Records of a certain region cannot be produced in parallel since their order must be static. + let offset = producer + .produce(kafka_record.clone()) + .await + .map(Offset) + .with_context(|_| ProduceRecordSnafu { + topic: &self.ns.topic, + size: kafka_record.approximate_size(), + limit: max_record_size, + })?; + last_offset = Some(offset); + } + } + // Safety: there must be at least one record produced when the entries are guaranteed not empty. + Ok(last_offset.unwrap()) + } +} + +fn record_type(seq: usize, num_records: usize) -> RecordType { + if seq == 0 { + RecordType::First + } else if seq == num_records - 1 { + RecordType::Last + } else { + RecordType::Middle(seq) + } +} + +fn build_records(entry: EntryImpl, max_record_size: usize) -> Vec { + if entry.data.len() <= max_record_size { + let record = Record { + meta: RecordMeta { + version: VERSION, + tp: RecordType::Full, + entry_id: entry.id, + ns: entry.ns, + }, + data: entry.data, + }; + return vec![record]; + } + + let chunks = entry.data.chunks(max_record_size); + let num_chunks = chunks.len(); + chunks + .enumerate() + .map(|(i, chunk)| Record { + meta: RecordMeta { + version: VERSION, + tp: record_type(i, num_chunks), + entry_id: entry.id, + ns: entry.ns.clone(), + }, + data: chunk.to_vec(), + }) + .collect() +} + +pub fn maybe_emit_entry( + record: Record, + entry_records: &mut HashMap>, +) -> Result> { + let mut entry = None; + match record.meta.tp { + RecordType::Full => { + entry = Some(EntryImpl::from(vec![record])); + } + RecordType::First => { + ensure!( + !entry_records.contains_key(&record.meta.entry_id), + IllegalSequenceSnafu { + error: "First record must be the first" + } + ); + entry_records.insert(record.meta.entry_id, vec![record]); + } + RecordType::Middle(seq) => { + let prefix = + entry_records + .get_mut(&record.meta.entry_id) + .context(IllegalSequenceSnafu { + error: "Middle record must not be the first", + })?; + // Safety: the records are guaranteed not empty if the key exists. + let last_record = prefix.last().unwrap(); + let legal = match last_record.meta.tp { + // Legal if this record follows a First record. + RecordType::First => seq == 1, + // Legal if this record follows a Middle record just prior to this record. + RecordType::Middle(last_seq) => last_seq + 1 == seq, + // Illegal sequence. + _ => false, + }; + ensure!( + legal, + IllegalSequenceSnafu { + error: "Illegal prefix for a Middle record" + } + ); + + prefix.push(record); + } + RecordType::Last => { + // There must have a sequence prefix before a Last record is read. + let mut records = + entry_records + .remove(&record.meta.entry_id) + .context(IllegalSequenceSnafu { + error: "Missing prefix for a Last record", + })?; + records.push(record); + entry = Some(EntryImpl::from(records)); + } + } + Ok(entry) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_base::readable_size::ReadableSize; + use common_config::wal::KafkaConfig; + use rand::Rng; + + use super::*; + use crate::kafka::client_manager::ClientManager; + + // Implements some utility methods for testing. + impl Default for Record { + fn default() -> Self { + Self { + meta: RecordMeta { + version: VERSION, + tp: RecordType::Full, + ns: NamespaceImpl { + region_id: 0, + topic: "greptimedb_wal_topic".to_string(), + }, + entry_id: 0, + }, + data: Vec::new(), + } + } + } + + impl Record { + /// Overrides tp. + fn with_tp(&self, tp: RecordType) -> Self { + Self { + meta: RecordMeta { + tp, + ..self.meta.clone() + }, + ..self.clone() + } + } + + /// Overrides data with the given data. + fn with_data(&self, data: &[u8]) -> Self { + Self { + data: data.to_vec(), + ..self.clone() + } + } + + /// Overrides entry id. + fn with_entry_id(&self, entry_id: EntryId) -> Self { + Self { + meta: RecordMeta { + entry_id, + ..self.meta.clone() + }, + ..self.clone() + } + } + + /// Overrides namespace. + fn with_ns(&self, ns: NamespaceImpl) -> Self { + Self { + meta: RecordMeta { ns, ..self.meta }, + ..self.clone() + } + } + } + + fn new_test_entry>(data: D, entry_id: EntryId, ns: NamespaceImpl) -> EntryImpl { + EntryImpl { + data: data.as_ref().to_vec(), + id: entry_id, + ns, + } + } + + /// Tests that the `build_records` works as expected. + #[test] + fn test_build_records() { + let max_record_size = 128; + + // On a small entry. + let ns = NamespaceImpl { + region_id: 1, + topic: "greptimedb_wal_topic".to_string(), + }; + let entry = new_test_entry([b'1'; 100], 0, ns.clone()); + let records = build_records(entry.clone(), max_record_size); + assert!(records.len() == 1); + assert_eq!(entry.data, records[0].data); + + // On a large entry. + let entry = new_test_entry([b'1'; 150], 0, ns.clone()); + let records = build_records(entry.clone(), max_record_size); + assert!(records.len() == 2); + assert_eq!(&records[0].data, &[b'1'; 128]); + assert_eq!(&records[1].data, &[b'1'; 22]); + + // On a way-too large entry. + let entry = new_test_entry([b'1'; 5000], 0, ns.clone()); + let records = build_records(entry.clone(), max_record_size); + let matched = entry + .data + .chunks(max_record_size) + .enumerate() + .all(|(i, chunk)| records[i].data == chunk); + assert!(matched); + } + + /// Tests that Record and KafkaRecord are able to be converted back and forth. + #[test] + fn test_record_conversion() { + let record = Record { + meta: RecordMeta { + version: VERSION, + tp: RecordType::Full, + entry_id: 1, + ns: NamespaceImpl { + region_id: 1, + topic: "greptimedb_wal_topic".to_string(), + }, + }, + data: b"12345".to_vec(), + }; + let kafka_record: KafkaRecord = record.clone().try_into().unwrap(); + let got = Record::try_from(kafka_record).unwrap(); + assert_eq!(record, got); + } + + /// Tests that the reconstruction of an entry works as expected. + #[test] + fn test_reconstruct_entry() { + let template = Record::default(); + let records = vec![ + template.with_data(b"111").with_tp(RecordType::First), + template.with_data(b"222").with_tp(RecordType::Middle(1)), + template.with_data(b"333").with_tp(RecordType::Last), + ]; + let entry = EntryImpl::from(records.clone()); + assert_eq!(records[0].meta.entry_id, entry.id); + assert_eq!(records[0].meta.ns, entry.ns); + assert_eq!( + entry.data, + records + .into_iter() + .flat_map(|record| record.data) + .collect::>() + ); + } + + /// Tests that `maybe_emit_entry` works as expected. + /// This test does not check for illegal record sequences since they're already tested in the `test_check_records` test. + #[test] + fn test_maybe_emit_entry() { + let ns = NamespaceImpl { + region_id: 1, + topic: "greptimedb_wal_topic".to_string(), + }; + let template = Record::default().with_ns(ns); + let mut entry_records = HashMap::from([ + ( + 1, + vec![template.with_entry_id(1).with_tp(RecordType::First)], + ), + ( + 2, + vec![template.with_entry_id(2).with_tp(RecordType::First)], + ), + ( + 3, + vec![ + template.with_entry_id(3).with_tp(RecordType::First), + template.with_entry_id(3).with_tp(RecordType::Middle(1)), + ], + ), + ]); + + // A Full record arrives. + let got = maybe_emit_entry( + template.with_entry_id(0).with_tp(RecordType::Full), + &mut entry_records, + ) + .unwrap(); + assert!(got.is_some()); + + // A First record arrives with no prefix. + let got = maybe_emit_entry( + template.with_entry_id(0).with_tp(RecordType::First), + &mut entry_records, + ) + .unwrap(); + assert!(got.is_none()); + + // A First record arrives with some prefix. + let got = maybe_emit_entry( + template.with_entry_id(1).with_tp(RecordType::First), + &mut entry_records, + ); + assert!(got.is_err()); + + // A Middle record arrives with legal prefix (First). + let got = maybe_emit_entry( + template.with_entry_id(2).with_tp(RecordType::Middle(1)), + &mut entry_records, + ) + .unwrap(); + assert!(got.is_none()); + + // A Middle record arrives with legal prefix (Middle). + let got = maybe_emit_entry( + template.with_entry_id(2).with_tp(RecordType::Middle(2)), + &mut entry_records, + ) + .unwrap(); + assert!(got.is_none()); + + // A Middle record arrives with illegal prefix. + let got = maybe_emit_entry( + template.with_entry_id(2).with_tp(RecordType::Middle(1)), + &mut entry_records, + ); + assert!(got.is_err()); + + // A Middle record arrives with no prefix. + let got = maybe_emit_entry( + template.with_entry_id(22).with_tp(RecordType::Middle(1)), + &mut entry_records, + ); + assert!(got.is_err()); + + // A Last record arrives with no prefix. + let got = maybe_emit_entry( + template.with_entry_id(33).with_tp(RecordType::Last), + &mut entry_records, + ); + assert!(got.is_err()); + + // A Last record arrives with legal prefix. + let got = maybe_emit_entry( + template.with_entry_id(3).with_tp(RecordType::Last), + &mut entry_records, + ) + .unwrap(); + assert!(got.is_some()); + + // Check state. + assert_eq!(entry_records.len(), 3); + assert_eq!(entry_records[&0].len(), 1); + assert_eq!(entry_records[&1].len(), 1); + assert_eq!(entry_records[&2].len(), 3); + } + + #[tokio::test] + async fn test_produce_large_entry() { + let topic = format!("greptimedb_wal_topic_{}", rand::thread_rng().gen::()); + let ns = NamespaceImpl { + region_id: 1, + topic, + }; + let entry = new_test_entry([b'1'; 2000000], 0, ns.clone()); + let producer = RecordProducer::new(ns.clone()).with_entries(vec![entry]); + + // TODO(niebayes): get broker endpoints from env vars. + let config = KafkaConfig { + broker_endpoints: vec!["localhost:9092".to_string()], + max_batch_size: ReadableSize::mb(1), + ..Default::default() + }; + let manager = Arc::new(ClientManager::try_new(&config).await.unwrap()); + producer.produce(&manager).await.unwrap(); + } +} diff --git a/src/mito2/src/region_write_ctx.rs b/src/mito2/src/region_write_ctx.rs index 6cf8043174a1..5cb081d2a533 100644 --- a/src/mito2/src/region_write_ctx.rs +++ b/src/mito2/src/region_write_ctx.rs @@ -165,6 +165,7 @@ impl RegionWriteCtx { &self.wal_entry, &self.wal_options, )?; + self.next_entry_id += 1; Ok(()) } diff --git a/tests/cases/standalone/common/types/string/scan_big_varchar.result b/tests/cases/standalone/common/types/string/scan_big_varchar.result index 5a14cc0e1996..2e26e72a5067 100644 --- a/tests/cases/standalone/common/types/string/scan_big_varchar.result +++ b/tests/cases/standalone/common/types/string/scan_big_varchar.result @@ -126,22 +126,111 @@ SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; | 128 | 128 | 10000 | 1280000 | +----------+-------------------+-----------------------------------+-----------------------------------+ --- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 53 FROM bigtable; --- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; --- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 57 FROM bigtable; --- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; --- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 61 FROM bigtable; --- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; --- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 63 FROM bigtable; --- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; --- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 67 FROM bigtable; --- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; --- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 71 FROM bigtable; --- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; --- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 73 FROM bigtable; --- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; --- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 79 FROM bigtable; --- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; +INSERT INTO bigtable SELECT a, to_unixtime(ts) * 53 FROM bigtable; + +Affected Rows: 128 + +SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; + ++----------+-------------------+-----------------------------------+-----------------------------------+ +| COUNT(*) | COUNT(bigtable.a) | MAX(character_length(bigtable.a)) | SUM(character_length(bigtable.a)) | ++----------+-------------------+-----------------------------------+-----------------------------------+ +| 256 | 256 | 10000 | 2560000 | ++----------+-------------------+-----------------------------------+-----------------------------------+ + +INSERT INTO bigtable SELECT a, to_unixtime(ts) * 57 FROM bigtable; + +Affected Rows: 256 + +SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; + ++----------+-------------------+-----------------------------------+-----------------------------------+ +| COUNT(*) | COUNT(bigtable.a) | MAX(character_length(bigtable.a)) | SUM(character_length(bigtable.a)) | ++----------+-------------------+-----------------------------------+-----------------------------------+ +| 512 | 512 | 10000 | 5120000 | ++----------+-------------------+-----------------------------------+-----------------------------------+ + +INSERT INTO bigtable SELECT a, to_unixtime(ts) * 61 FROM bigtable; + +Affected Rows: 512 + +SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; + ++----------+-------------------+-----------------------------------+-----------------------------------+ +| COUNT(*) | COUNT(bigtable.a) | MAX(character_length(bigtable.a)) | SUM(character_length(bigtable.a)) | ++----------+-------------------+-----------------------------------+-----------------------------------+ +| 1024 | 1024 | 10000 | 10240000 | ++----------+-------------------+-----------------------------------+-----------------------------------+ + +INSERT INTO bigtable SELECT a, to_unixtime(ts) * 63 FROM bigtable; + +Affected Rows: 1024 + +SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; + ++----------+-------------------+-----------------------------------+-----------------------------------+ +| COUNT(*) | COUNT(bigtable.a) | MAX(character_length(bigtable.a)) | SUM(character_length(bigtable.a)) | ++----------+-------------------+-----------------------------------+-----------------------------------+ +| 2048 | 2048 | 10000 | 20480000 | ++----------+-------------------+-----------------------------------+-----------------------------------+ + +INSERT INTO bigtable SELECT a, to_unixtime(ts) * 67 FROM bigtable; + +Affected Rows: 2048 + +SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; + ++----------+-------------------+-----------------------------------+-----------------------------------+ +| COUNT(*) | COUNT(bigtable.a) | MAX(character_length(bigtable.a)) | SUM(character_length(bigtable.a)) | ++----------+-------------------+-----------------------------------+-----------------------------------+ +| 4096 | 4096 | 10000 | 40960000 | ++----------+-------------------+-----------------------------------+-----------------------------------+ + +INSERT INTO bigtable SELECT a, to_unixtime(ts) * 71 FROM bigtable; + +Affected Rows: 4096 + +SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; + ++----------+-------------------+-----------------------------------+-----------------------------------+ +| COUNT(*) | COUNT(bigtable.a) | MAX(character_length(bigtable.a)) | SUM(character_length(bigtable.a)) | ++----------+-------------------+-----------------------------------+-----------------------------------+ +| 8192 | 8192 | 10000 | 81920000 | ++----------+-------------------+-----------------------------------+-----------------------------------+ + +-- SQLNESS ARG restart=true +SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; + ++----------+-------------------+-----------------------------------+-----------------------------------+ +| COUNT(*) | COUNT(bigtable.a) | MAX(character_length(bigtable.a)) | SUM(character_length(bigtable.a)) | ++----------+-------------------+-----------------------------------+-----------------------------------+ +| 8192 | 8192 | 10000 | 81920000 | ++----------+-------------------+-----------------------------------+-----------------------------------+ + +INSERT INTO bigtable SELECT a, to_unixtime(ts) * 73 FROM bigtable; + +Affected Rows: 8192 + +SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; + ++----------+-------------------+-----------------------------------+-----------------------------------+ +| COUNT(*) | COUNT(bigtable.a) | MAX(character_length(bigtable.a)) | SUM(character_length(bigtable.a)) | ++----------+-------------------+-----------------------------------+-----------------------------------+ +| 16384 | 16384 | 10000 | 163840000 | ++----------+-------------------+-----------------------------------+-----------------------------------+ + +INSERT INTO bigtable SELECT a, to_unixtime(ts) * 79 FROM bigtable; + +Affected Rows: 16384 + +SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; + ++----------+-------------------+-----------------------------------+-----------------------------------+ +| COUNT(*) | COUNT(bigtable.a) | MAX(character_length(bigtable.a)) | SUM(character_length(bigtable.a)) | ++----------+-------------------+-----------------------------------+-----------------------------------+ +| 32768 | 32768 | 10000 | 327680000 | ++----------+-------------------+-----------------------------------+-----------------------------------+ + DROP TABLE test; Affected Rows: 0 diff --git a/tests/cases/standalone/common/types/string/scan_big_varchar.sql b/tests/cases/standalone/common/types/string/scan_big_varchar.sql index d9ce27e041f8..1586fa230bbc 100644 --- a/tests/cases/standalone/common/types/string/scan_big_varchar.sql +++ b/tests/cases/standalone/common/types/string/scan_big_varchar.sql @@ -51,38 +51,41 @@ INSERT INTO bigtable SELECT a, to_unixtime(ts) * 51 FROM bigtable; SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; --- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 53 FROM bigtable; +INSERT INTO bigtable SELECT a, to_unixtime(ts) * 53 FROM bigtable; --- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; +SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; --- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 57 FROM bigtable; +INSERT INTO bigtable SELECT a, to_unixtime(ts) * 57 FROM bigtable; --- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; +SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; --- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 61 FROM bigtable; +INSERT INTO bigtable SELECT a, to_unixtime(ts) * 61 FROM bigtable; --- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; +SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; --- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 63 FROM bigtable; +INSERT INTO bigtable SELECT a, to_unixtime(ts) * 63 FROM bigtable; + +SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; --- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; +INSERT INTO bigtable SELECT a, to_unixtime(ts) * 67 FROM bigtable; --- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 67 FROM bigtable; +SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; --- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; +INSERT INTO bigtable SELECT a, to_unixtime(ts) * 71 FROM bigtable; --- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 71 FROM bigtable; +SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; --- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; +-- SQLNESS ARG restart=true +SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; --- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 73 FROM bigtable; +INSERT INTO bigtable SELECT a, to_unixtime(ts) * 73 FROM bigtable; --- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; +SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; --- INSERT INTO bigtable SELECT a, to_unixtime(ts) * 79 FROM bigtable; +INSERT INTO bigtable SELECT a, to_unixtime(ts) * 79 FROM bigtable; --- SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; +SELECT COUNT(*), COUNT(a), MAX(LENGTH(a)), SUM(LENGTH(a)) FROM bigtable; DROP TABLE test;