Skip to content

Commit

Permalink
Merge branch 'main' into feat/pg_table_is_visible
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 committed Jan 21, 2025
2 parents f4c80fe + ff49a86 commit b127fc4
Show file tree
Hide file tree
Showing 81 changed files with 3,233 additions and 2,591 deletions.
149 changes: 91 additions & 58 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions ci/scripts/e2e-iceberg-sink-v2-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ poetry run python main.py -t ./test_case/iceberg_source_position_delete.toml
poetry run python main.py -t ./test_case/iceberg_source_all_delete.toml
poetry run python main.py -t ./test_case/iceberg_source_explain_for_delete.toml
poetry run python main.py -t ./test_case/iceberg_predicate_pushdown.toml
poetry run python main.py -t ./test_case/iceberg_connection.toml

echo "--- Running benchmarks"
poetry run python main.py -t ./benches/predicate_pushdown.toml
Expand Down
4 changes: 2 additions & 2 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ steps:
plugins:
- docker-compose#v5.5.0: *docker-compose
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 12
timeout_in_minutes: 15
retry: *auto-retry

- group: "end-to-end connector test (release)"
Expand Down Expand Up @@ -440,7 +440,7 @@ steps:
plugins:
- docker-compose#v5.5.0: *docker-compose
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 15
timeout_in_minutes: 17
retry: *auto-retry

- label: "e2e java-binding test (release)"
Expand Down
2 changes: 1 addition & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ steps:
<<: *docker-compose
run: sink-test-env
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 17
timeout_in_minutes: 19
retry: *auto-retry

- label: "end-to-end iceberg cdc test"
Expand Down
67 changes: 67 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_connection.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
statement ok
set streaming_parallelism=4;

statement ok
CREATE TABLE s1 (i1 int, i2 varchar, i3 varchar);

statement ok
CREATE CONNECTION CONN WITH (
type = 'iceberg',
catalog.name = 'demo',
catalog.type = 'storage',
warehouse.path = 's3a://hummock001/iceberg-data',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin'
);

statement ok
CREATE SINK sink1 from s1 WITH (
connector = 'iceberg',
type = 'upsert',
database.name = 'demo_db',
table.name = 'test_connection_table',
connection = conn,
create_table_if_not_exists = 'true',
commit_checkpoint_interval = 1,
primary_key = 'i1,i2',
);

statement ok
INSERT INTO s1 (i1, i2, i3) values(1,'1','1'),(2,'2','2'),(3,'3','3'),(4,'4','4'),(5,'5','5');

statement ok
flush

statement ok
CREATE SOURCE iceberg_t1_source
WITH (
connector = 'iceberg',
connection = conn,
database.name = 'demo_db',
table.name = 'test_connection_table',
);

sleep 2s

query I
select * from iceberg_t1_source order by i1 limit 5;
----
1 1 1
2 2 2
3 3 3
4 4 4
5 5 5

statement ok
DROP SINK sink1;

statement ok
DROP SOURCE iceberg_t1_source;

statement ok
DROP TABLE s1 cascade;

statement ok
DROP CONNECTION conn;
11 changes: 11 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_connection.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
init_sqls = [
'CREATE SCHEMA IF NOT EXISTS demo_db',
'DROP TABLE IF EXISTS demo_db.test_connection_table',
]

slt = 'test_case/iceberg_connection.slt'

drop_sqls = [
'DROP TABLE IF EXISTS demo_db.test_connection_table',
'DROP SCHEMA IF EXISTS demo_db',
]
5 changes: 5 additions & 0 deletions e2e_test/source_inline/commands.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ script = '''
set -e
if [ -n "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" ]; then
echo "Deleting all Kafka consumer groups..."
rpk group list | tail -n +2 | awk '{print $2}' | while read -r group; do
echo "Deleting Kafka consumer group: $group"
rpk group delete "$group"
done
echo "Deleting all Kafka topics..."
rpk topic delete -r "*"
echo "Deleting all schema registry subjects"
Expand Down
6 changes: 6 additions & 0 deletions e2e_test/source_inline/kafka/alter/add_column_shared.slt
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ select * from mv_before_alter;
statement ok
alter source s add column v3 varchar;

# Demonstrate definition change.
query T
SELECT SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_sources WHERE name = 's';
----
CREATE SOURCE s (v1 INT, v2 CHARACTER VARYING, v3 CHARACTER VARYING)

# New MV will have v3.

# Check it should still be shared source <https://github.com/risingwavelabs/risingwave/issues/19799>
Expand Down
6 changes: 6 additions & 0 deletions e2e_test/source_inline/kafka/avro/alter_source.slt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ No tracking issue yet. Feel free to submit a feature request at https://github.c
statement ok
alter source s format plain encode avro (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}');

# Demonstrate definition change.
query T
SELECT SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_sources WHERE name = 's';
----
CREATE SOURCE s (foo CHARACTER VARYING, bar INT)

query ??
select * from s
----
Expand Down
20 changes: 3 additions & 17 deletions e2e_test/source_inline/kafka/consumer_group.slt.serial
Original file line number Diff line number Diff line change
Expand Up @@ -58,28 +58,14 @@ EOF
# (enable.auto.commit defaults to true, and auto.commit.interval.ms defaults to 5s)
sleep 5s

system ok
./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv list-members
----
0


# The lag for MV's group is 0.
system ok
./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv list-lags
----
0


# Test delete consumer group on drop

# my_group: 1 source fragment, 1 backfill fragment, 1 batch query
# TODO: drop backfill fragment on backfill finish
# my_group: 1 source fragment, 1 batch query, (1 backfill fragment's group is already dropped after backfill finished)
# We only check my_group to avoid interfering with other tests.
system ok retry 3 backoff 5s
./e2e_test/source_inline/kafka/consumer_group.mjs count-groups | grep my_group
----
my_group: 3
my_group: 2


statement ok
Expand All @@ -90,7 +76,7 @@ sleep 1s
system ok retry 3 backoff 5s
./e2e_test/source_inline/kafka/consumer_group.mjs count-groups | grep my_group
----
my_group: 2
my_group: 1


system ok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ c
# (enable.auto.commit defaults to true, and auto.commit.interval.ms defaults to 5s)
sleep 5s

system ok
system ok retry 3 backoff 5s
./e2e_test/source_inline/kafka/consumer_group.mjs --mv mv list-members
----
0
Expand Down
10 changes: 10 additions & 0 deletions e2e_test/source_inline/kafka/protobuf/alter_source_shared.slt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ FORMAT PLAIN ENCODE PROTOBUF(
message = 'test.User'
);

query T
SELECT SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_sources WHERE name = 'src_user';
----
CREATE SOURCE src_user (id INT, name CHARACTER VARYING, address CHARACTER VARYING, city CHARACTER VARYING, gender CHARACTER VARYING, sc STRUCT<file_name CHARACTER VARYING>) INCLUDE timestamp

statement ok
CREATE MATERIALIZED VIEW mv_user AS SELECT * FROM src_user;

Expand All @@ -45,6 +50,11 @@ set streaming_use_shared_source to false;
statement ok
ALTER SOURCE src_user REFRESH SCHEMA;

query T
SELECT SUBSTRING(definition, 1, POSITION(' WITH' IN definition) - 1) FROM rw_sources WHERE name = 'src_user';
----
CREATE SOURCE src_user (id INT, name CHARACTER VARYING, address CHARACTER VARYING, city CHARACTER VARYING, gender CHARACTER VARYING, sc STRUCT<file_name CHARACTER VARYING>, age INT) INCLUDE timestamp

# Check it should still be shared source <https://github.com/risingwavelabs/risingwave/issues/19799>
query
EXPLAIN CREATE MATERIALIZED VIEW mv_user_more AS SELECT * FROM src_user;
Expand Down
8 changes: 4 additions & 4 deletions e2e_test/source_inline/kafka/shared_source.slt.serial
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,10 @@ select v1, v2 from mv_1;
system ok
internal_table.mjs --name mv_1 --type sourcebackfill
----
0,"{""num_consumed_rows"": 2, ""state"": ""Finished"", ""target_offset"": ""0""}"
1,"{""num_consumed_rows"": 2, ""state"": ""Finished"", ""target_offset"": ""0""}"
2,"{""num_consumed_rows"": 3, ""state"": ""Finished"", ""target_offset"": ""1""}"
3,"{""num_consumed_rows"": 4, ""state"": ""Finished"", ""target_offset"": ""2""}"
0,"{""num_consumed_rows"": 1, ""state"": ""Finished"", ""target_offset"": ""0""}"
1,"{""num_consumed_rows"": 1, ""state"": ""Finished"", ""target_offset"": ""0""}"
2,"{""num_consumed_rows"": 2, ""state"": ""Finished"", ""target_offset"": ""1""}"
3,"{""num_consumed_rows"": 3, ""state"": ""Finished"", ""target_offset"": ""2""}"


system ok
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/datagen/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ require (
go.uber.org/atomic v1.7.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/term v0.27.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/datagen/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -538,8 +538,8 @@ golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/debezium-mysql/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ services:
retries: 5
container_name: mysql
debezium:
image: debezium-connect
image: debezium/connect:1.9
build: .
environment:
BOOTSTRAP_SERVERS: message_queue:29092
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/debezium-postgres/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ services:
- "./postgres_prepare.sql:/postgres_prepare.sql"

debezium:
image: debezium-connect
image: debezium/connect:1.9
build: .
environment:
BOOTSTRAP_SERVERS: message_queue:29092
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/debezium-sqlserver/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ services:
- ./sqlserver_prepare.sql:/sqlserver_prepare.sql

debezium:
image: debezium-connect
image: debezium/connect:1.9
build: .
environment:
BOOTSTRAP_SERVERS: message_queue:29092
Expand Down
Loading

0 comments on commit b127fc4

Please sign in to comment.