Skip to content

Commit

Permalink
pick
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Jan 13, 2025
1 parent f8c7cad commit 149fda2
Show file tree
Hide file tree
Showing 81 changed files with 3,972 additions and 2,963 deletions.
44 changes: 44 additions & 0 deletions e2e_test/batch/basic/rw_timestamp.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t (a int, id int primary key);

statement ok
create index idx on t(a);

statement ok
insert into t values (1, 1), (2, 2);

query ?? rowsort
select ABS(EXTRACT(EPOCH FROM (_rw_timestamp - now()))) < 2, a, id from t;
----
t 1 1
t 2 2

sleep 3s

statement ok
update t set a = 11 where id = 1;

query ?? rowsort
select ABS(EXTRACT(EPOCH FROM (_rw_timestamp - now()))) < 2, a, id from t;
----
f 2 2
t 11 1

query ?? rowsort
select ABS(EXTRACT(EPOCH FROM (_rw_timestamp - now()))) < 2, a, id from t where id = 1;
----
t 11 1

query ?? rowsort
select ABS(EXTRACT(EPOCH FROM (_rw_timestamp - now()))) < 2, a, id from t where a = 11;
----
t 11 1

statement ok
delete from t;

statement ok
drop table t;
3 changes: 3 additions & 0 deletions e2e_test/ddl/show.slt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ v1 integer false turpis vehicula
v2 integer false Lorem ipsum dolor sit amet
v3 integer false NULL
_row_id serial true consectetur adipiscing elit
_rw_timestamp timestamp with time zone true NULL
primary key _row_id NULL NULL
distribution key _row_id NULL NULL
table description t3 NULL volutpat vitae
Expand All @@ -37,6 +38,7 @@ v1 integer false turpis vehicula
v2 integer false Lorem ipsum dolor sit amet
v3 integer false NULL
_row_id serial true consectetur adipiscing elit
_rw_timestamp timestamp with time zone true NULL

statement ok
create index idx1 on t3 (v1,v2);
Expand Down Expand Up @@ -65,6 +67,7 @@ v1 integer false Nemo enim ipsam
v2 integer false NULL
v3 integer false NULL
_row_id serial true NULL
_rw_timestamp timestamp with time zone true NULL
primary key _row_id NULL NULL
distribution key _row_id NULL NULL
idx1 index(v1 ASC, v2 ASC) include(v3) distributed by(v1) NULL NULL
Expand Down
2 changes: 2 additions & 0 deletions e2e_test/extended_mode/basic.slt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ v1 integer false NULL
v2 integer false NULL
v3 integer false NULL
_row_id serial true NULL
_rw_timestamp timestamp with time zone true NULL
primary key _row_id NULL NULL
distribution key _row_id NULL NULL
table description t3 NULL NULL
Expand All @@ -57,6 +58,7 @@ v1 integer false NULL
v2 integer false NULL
v3 integer false NULL
_row_id serial true NULL
_rw_timestamp timestamp with time zone true NULL

statement ok
drop table t3;
Expand Down
1 change: 1 addition & 0 deletions e2e_test/source_legacy/basic/nosim_kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ productId bigint false NULL
productName character varying false NULL
tags character varying[] false NULL
_row_id serial true NULL
_rw_timestamp timestamp with time zone true NULL
primary key _row_id NULL NULL
distribution key _row_id NULL NULL
table description kafka_json_schema_plain NULL NULL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ describe my_products;
id integer false NULL
name character varying false NULL
description character varying false NULL
_rw_timestamp timestamp with time zone true NULL
primary key id NULL NULL
distribution key id NULL NULL
table description my_products NULL NULL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ id bigint false NULL
modified timestamp without time zone false NULL
name character varying false NULL
custinfo jsonb false NULL
_rw_timestamp timestamp with time zone true NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_customers NULL NULL
Expand All @@ -66,6 +67,7 @@ name character varying false NULL
custinfo jsonb false NULL
v1 character varying false NULL
v2 double precision false NULL
_rw_timestamp timestamp with time zone true NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_customers NULL NULL
Expand Down Expand Up @@ -96,6 +98,7 @@ name character varying false NULL
custinfo jsonb false NULL
v1 character varying false NULL
v2 double precision false NULL
_rw_timestamp timestamp with time zone true NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_customers NULL NULL
Expand Down Expand Up @@ -126,6 +129,7 @@ describe rw_customers;
id bigint false NULL
name character varying false NULL
custinfo jsonb false NULL
_rw_timestamp timestamp with time zone true NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_customers NULL NULL
Expand Down
3 changes: 3 additions & 0 deletions e2e_test/source_legacy/cdc_inline/auto_schema_change_pg.slt
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ v7 timestamp without time zone false NULL
v8 timestamp with time zone false NULL
v9 interval false NULL
v10 jsonb false NULL
_rw_timestamp timestamp with time zone true NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_test_schema_change NULL NULL
Expand Down Expand Up @@ -100,6 +101,7 @@ v9 interval false NULL
v10 jsonb false NULL
v11 character varying false NULL
v12 numeric false NULL
_rw_timestamp timestamp with time zone true NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_test_schema_change NULL NULL
Expand Down Expand Up @@ -145,6 +147,7 @@ v9 interval false NULL
v10 jsonb false NULL
v11 character varying false NULL
v12 numeric false NULL
_rw_timestamp timestamp with time zone true NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_test_schema_change NULL NULL
Expand Down
2 changes: 2 additions & 0 deletions e2e_test/source_legacy/cdc_inline/auto_schema_map_mysql.slt
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ describe rw_customers;
id bigint false NULL
modified timestamp without time zone false NULL
custinfo jsonb false NULL
_rw_timestamp timestamp with time zone true NULL
primary key id NULL NULL
distribution key id NULL NULL
table description rw_customers NULL NULL
Expand Down Expand Up @@ -153,6 +154,7 @@ c_datetime timestamp without time zone false NULL
c_timestamp timestamp with time zone false NULL
c_enum character varying false NULL
c_json jsonb false NULL
_rw_timestamp timestamp with time zone true NULL
primary key c_boolean, c_bigint, c_date NULL NULL
distribution key c_boolean, c_bigint, c_date NULL NULL
table description rw_mysql_types_test NULL NULL
Expand Down
1 change: 1 addition & 0 deletions e2e_test/source_legacy/cdc_inline/auto_schema_map_pg.slt
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ c_interval_array interval[] false NULL
c_jsonb_array jsonb[] false NULL
c_uuid_array character varying[] false NULL
c_enum_array character varying[] false NULL
_rw_timestamp timestamp with time zone true NULL
primary key c_boolean, c_bigint, c_date NULL NULL
distribution key c_boolean, c_bigint, c_date NULL NULL
table description rw_postgres_types_test NULL NULL
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/join/distributed_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ impl<S: StateStore> LookupExecutorBuilder for InnerSideExecutorBuilder<S> {

let pk_prefix = OwnedRow::new(scan_range.eq_conds);

if self.lookup_prefix_len == self.table.pk_indices().len() {
if self.lookup_prefix_len == self.table.pk_indices().len() && !self.table.has_epoch_idx() {
let row = self.table.get_row(&pk_prefix, self.epoch.into()).await?;

if let Some(row) = row {
Expand Down
31 changes: 27 additions & 4 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,17 +408,40 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
) -> Result<Option<OwnedRow>> {
let pk_prefix = scan_range.pk_prefix;
assert!(pk_prefix.len() == table.pk_indices().len());

let timer = histogram.as_ref().map(|histogram| histogram.start_timer());

// Point Get.
let row = table.get_row(&pk_prefix, epoch.into()).await?;
let res = if table.has_epoch_idx() {
// has epoch_idx means we need to select `_rw_timestamp` column which is unsupported by `get_row` interface, so use iterator interface instead.
let range_bounds = (Bound::<OwnedRow>::Unbounded, Bound::Unbounded);
let iter = table
.batch_chunk_iter_with_pk_bounds(
epoch.into(),
&pk_prefix,
range_bounds,
false,
1,
PrefetchOptions::new(false, false),
)
.await?;
pin_mut!(iter);
let chunk = iter.next().await.transpose().map_err(BatchError::from)?;
if let Some(chunk) = chunk {
let row = chunk.row_at(0).0.to_owned_row();
Ok(Some(row))
} else {
Ok(None)
}
} else {
// Point Get.
let row = table.get_row(&pk_prefix, epoch.into()).await?;
Ok(row)
};

if let Some(timer) = timer {
timer.observe_duration()
}

Ok(row)
res
}

#[try_stream(ok = DataChunk, error = BatchError)]
Expand Down
55 changes: 54 additions & 1 deletion src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use risingwave_pb::plan_common::{
AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, PbColumnCatalog, PbColumnDesc,
};

use super::{row_id_column_desc, USER_COLUMN_ID_OFFSET};
use super::{row_id_column_desc, rw_timestamp_column_desc, USER_COLUMN_ID_OFFSET};
use crate::catalog::{cdc_table_name_column_desc, offset_column_desc, Field, ROW_ID_COLUMN_ID};
use crate::types::DataType;
use crate::util::value_encoding::DatumToProtoExt;
Expand Down Expand Up @@ -101,6 +101,11 @@ impl std::fmt::Display for ColumnId {
}
}

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum SystemColumn {
RwTimestamp,
}

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct ColumnDesc {
pub data_type: DataType,
Expand All @@ -112,6 +117,9 @@ pub struct ColumnDesc {
pub description: Option<String>,
pub additional_column: AdditionalColumn,
pub version: ColumnDescVersion,
/// Currently the system column is used for `_rw_timestamp` only and is generated at runtime,
/// so this field is not persisted.
pub system_column: Option<SystemColumn>,
}

impl ColumnDesc {
Expand All @@ -126,6 +134,7 @@ impl ColumnDesc {
description: None,
additional_column: AdditionalColumn { column_type: None },
version: ColumnDescVersion::Pr13707,
system_column: None,
}
}

Expand All @@ -140,6 +149,7 @@ impl ColumnDesc {
description: None,
additional_column: AdditionalColumn { column_type: None },
version: ColumnDescVersion::Pr13707,
system_column: None,
}
}

Expand Down Expand Up @@ -180,6 +190,27 @@ impl ColumnDesc {
description: None,
additional_column: additional_column_type,
version: ColumnDescVersion::Pr13707,
system_column: None,
}
}

pub fn named_with_system_column(
name: impl Into<String>,
column_id: ColumnId,
data_type: DataType,
system_column: SystemColumn,
) -> ColumnDesc {
ColumnDesc {
data_type,
column_id,
name: name.into(),
field_descs: vec![],
type_name: String::new(),
generated_or_default_column: None,
description: None,
additional_column: AdditionalColumn { column_type: None },
version: ColumnDescVersion::Pr13707,
system_column: Some(system_column),
}
}

Expand Down Expand Up @@ -229,6 +260,7 @@ impl ColumnDesc {
description: None,
additional_column: AdditionalColumn { column_type: None },
version: ColumnDescVersion::Pr13707,
system_column: None,
}
}

Expand All @@ -252,6 +284,7 @@ impl ColumnDesc {
description: None,
additional_column: AdditionalColumn { column_type: None },
version: ColumnDescVersion::Pr13707,
system_column: None,
}
}

Expand All @@ -270,6 +303,7 @@ impl ColumnDesc {
generated_or_default_column: None,
additional_column: AdditionalColumn { column_type: None },
version: ColumnDescVersion::Pr13707,
system_column: None,
}
}

Expand Down Expand Up @@ -314,6 +348,7 @@ impl From<PbColumnDesc> for ColumnDesc {
description: prost.description.clone(),
additional_column,
version,
system_column: None,
}
}
}
Expand Down Expand Up @@ -372,6 +407,10 @@ impl ColumnCatalog {
self.column_desc.is_generated()
}

pub fn can_dml(&self) -> bool {
!self.is_generated() && !self.is_rw_timestamp_column()
}

/// If the column is a generated column
pub fn generated_expr(&self) -> Option<&ExprNode> {
if let Some(GeneratedOrDefaultColumn::GeneratedColumn(desc)) =
Expand Down Expand Up @@ -424,6 +463,20 @@ impl ColumnCatalog {
}
}

pub fn rw_timestamp_column() -> Self {
Self {
column_desc: rw_timestamp_column_desc(),
is_hidden: true,
}
}

pub fn is_rw_timestamp_column(&self) -> bool {
matches!(
self.column_desc.system_column,
Some(SystemColumn::RwTimestamp)
)
}

pub fn offset_column() -> Self {
Self {
column_desc: offset_column_desc(),
Expand Down
11 changes: 11 additions & 0 deletions src/common/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,17 @@ pub fn row_id_column_desc() -> ColumnDesc {
ColumnDesc::named(ROWID_PREFIX, ROW_ID_COLUMN_ID, DataType::Serial)
}

pub const RW_TIMESTAMP_COLUMN_NAME: &str = "_rw_timestamp";
pub const RW_TIMESTAMP_COLUMN_ID: ColumnId = ColumnId::new(-1);
pub fn rw_timestamp_column_desc() -> ColumnDesc {
ColumnDesc::named_with_system_column(
RW_TIMESTAMP_COLUMN_NAME,
RW_TIMESTAMP_COLUMN_ID,
DataType::Timestamptz,
SystemColumn::RwTimestamp,
)
}

pub const OFFSET_COLUMN_NAME: &str = "_rw_offset";

// The number of columns output by the cdc source job
Expand Down
1 change: 1 addition & 0 deletions src/connector/codec/tests/integration_tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ impl<'a> std::fmt::Debug for ColumnDescTestDisplay<'a> {
description,
additional_column: AdditionalColumn { column_type },
version: _,
system_column: _,
} = &self.0;

write!(
Expand Down
Loading

0 comments on commit 149fda2

Please sign in to comment.