From e301f7880e38eaf966a109b0ff5e09a7ea57e16b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 1 May 2024 13:48:44 +0000 Subject: [PATCH] MySQL peer (#1637) Currently testing with [mariadb](https://hub.docker.com/_/mariadb) ``` CREATE PEER my FROM MYSQL WITH ( host = 'mysql', port = '3306', user = 'root', password = 'example', database = 'main', compression = 1, disable_tls = true ); ``` The scope of this PR is only peer creation & nexus proxy. No mirror support Co-authored-by: Kaushik Iska --- flow/connectors/core.go | 7 +- flow/connectors/mysql/mysql.go | 15 + nexus/Cargo.lock | 540 +++++++++++++++--- nexus/Cargo.toml | 18 +- nexus/analyzer/src/lib.rs | 167 ++++-- nexus/analyzer/src/qrep.rs | 12 +- nexus/catalog/src/lib.rs | 9 +- nexus/flow-rs/Cargo.toml | 2 +- nexus/parser/Cargo.toml | 2 +- nexus/parser/src/lib.rs | 42 +- nexus/peer-ast/Cargo.toml | 12 + nexus/peer-ast/src/lib.rs | 101 ++++ nexus/peer-bigquery/Cargo.toml | 9 +- nexus/peer-bigquery/src/ast.rs | 122 +--- nexus/peer-bigquery/src/lib.rs | 98 ++-- nexus/peer-bigquery/src/stream.rs | 2 +- nexus/peer-connections/Cargo.toml | 2 +- nexus/peer-cursor/Cargo.toml | 2 + nexus/peer-cursor/src/lib.rs | 9 + .../cursor.rs => peer-cursor/src/manager.rs} | 61 +- nexus/peer-mysql/Cargo.toml | 27 + nexus/peer-mysql/src/ast.rs | 63 ++ nexus/peer-mysql/src/client.rs | 53 ++ nexus/peer-mysql/src/lib.rs | 232 ++++++++ nexus/peer-mysql/src/stream.rs | 189 ++++++ nexus/peer-postgres/Cargo.toml | 4 +- nexus/peer-snowflake/Cargo.toml | 3 +- nexus/peer-snowflake/src/ast.rs | 3 +- nexus/peer-snowflake/src/cursor.rs | 124 ---- nexus/peer-snowflake/src/lib.rs | 98 ++-- nexus/postgres-connection/Cargo.toml | 2 +- nexus/pt/src/lib.rs | 1 + nexus/server/Cargo.toml | 7 +- nexus/server/src/main.rs | 221 +++---- .../server/tests/results/expected/bq.sql.out | 2 +- .../tests/results/expected/postgres.sql.out | 14 +- nexus/server/tests/server_test.rs | 71 +-- nexus/value/Cargo.toml | 2 +- protos/peers.proto | 14 +- ui/app/api/peers/getTruePeer.ts | 10 +- 40 files changed, 1651 insertions(+), 721 deletions(-) create mode 100644 flow/connectors/mysql/mysql.go create mode 100644 nexus/peer-ast/Cargo.toml create mode 100644 nexus/peer-ast/src/lib.rs rename nexus/{peer-bigquery/src/cursor.rs => peer-cursor/src/manager.rs} (56%) create mode 100644 nexus/peer-mysql/Cargo.toml create mode 100644 nexus/peer-mysql/src/ast.rs create mode 100644 nexus/peer-mysql/src/client.rs create mode 100644 nexus/peer-mysql/src/lib.rs create mode 100644 nexus/peer-mysql/src/stream.rs delete mode 100644 nexus/peer-snowflake/src/cursor.rs diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 545269c4d1..9a9a42f1a3 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -13,6 +13,7 @@ import ( connelasticsearch "github.com/PeerDB-io/peer-flow/connectors/connelasticsearch" conneventhub "github.com/PeerDB-io/peer-flow/connectors/eventhub" connkafka "github.com/PeerDB-io/peer-flow/connectors/kafka" + connmysql "github.com/PeerDB-io/peer-flow/connectors/mysql" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" connpubsub "github.com/PeerDB-io/peer-flow/connectors/pubsub" conns3 "github.com/PeerDB-io/peer-flow/connectors/s3" @@ -224,14 +225,14 @@ func GetConnector(ctx context.Context, config *protos.Peer) (Connector, error) { return connbigquery.NewBigQueryConnector(ctx, inner.BigqueryConfig) case *protos.Peer_SnowflakeConfig: return connsnowflake.NewSnowflakeConnector(ctx, inner.SnowflakeConfig) - case *protos.Peer_EventhubConfig: - return nil, errors.New("use eventhub group config instead") case *protos.Peer_EventhubGroupConfig: return conneventhub.NewEventHubConnector(ctx, inner.EventhubGroupConfig) case *protos.Peer_S3Config: return conns3.NewS3Connector(ctx, inner.S3Config) case *protos.Peer_SqlserverConfig: return connsqlserver.NewSQLServerConnector(ctx, inner.SqlserverConfig) + case *protos.Peer_MysqlConfig: + return connmysql.MySqlConnector{}, nil case *protos.Peer_ClickhouseConfig: return connclickhouse.NewClickhouseConnector(ctx, inner.ClickhouseConfig) case *protos.Peer_KafkaConfig: @@ -341,4 +342,6 @@ var ( _ ValidationConnector = &connclickhouse.ClickhouseConnector{} _ ValidationConnector = &connbigquery.BigQueryConnector{} _ ValidationConnector = &conns3.S3Connector{} + + _ Connector = &connmysql.MySqlConnector{} ) diff --git a/flow/connectors/mysql/mysql.go b/flow/connectors/mysql/mysql.go new file mode 100644 index 0000000000..d898826e1f --- /dev/null +++ b/flow/connectors/mysql/mysql.go @@ -0,0 +1,15 @@ +// stub to bypass validation + +package mysql + +import "context" + +type MySqlConnector struct{} + +func (MySqlConnector) Close() error { + return nil +} + +func (MySqlConnector) ConnectionActive(context.Context) error { + return nil +} diff --git a/nexus/Cargo.lock b/nexus/Cargo.lock index e53188476f..7d9188ff6d 100644 --- a/nexus/Cargo.lock +++ b/nexus/Cargo.lock @@ -39,6 +39,18 @@ dependencies = [ "version_check", ] +[[package]] +name = "ahash" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.3" @@ -48,6 +60,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" + [[package]] name = "analyzer" version = "0.1.0" @@ -151,9 +169,9 @@ checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" [[package]] name = "async-compression" -version = "0.4.8" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07dbbf24db18d609b1462965249abdf49129ccad073ec257da372adc83259c60" +checksum = "4e9eabd7a98fe442131a17c316bd9349c43695e49e730c3c8e12cfb5f4da2693" dependencies = [ "flate2", "futures-core", @@ -164,9 +182,9 @@ dependencies = [ [[package]] name = "async-recursion" -version = "1.1.0" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30c5ef0ede93efbf733c1a727f3b6b5a1060bbedd5600183e66f6e4be4af0ec5" +checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11" dependencies = [ "proc-macro2", "quote", @@ -300,6 +318,26 @@ dependencies = [ "smallvec", ] +[[package]] +name = "bindgen" +version = "0.69.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a00dc851838a2120612785d195287475a3ac45514741da670b735818822129a0" +dependencies = [ + "bitflags 2.5.0", + "cexpr", + "clang-sys", + "itertools 0.12.1", + "lazy_static", + "lazycell", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn 2.0.60", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -366,6 +404,15 @@ dependencies = [ "syn_derive", ] +[[package]] +name = "btoi" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd6407f73a9b8b6162d8a2ef999fe6afd7cc15902ebf42c5cd296addf17e0ad" +dependencies = [ + "num-traits", +] + [[package]] name = "bumpalo" version = "3.16.0" @@ -408,9 +455,9 @@ checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" [[package]] name = "cargo-deb" -version = "2.1.0" +version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8978c203da69c3bd93d0e676477d9f4cf1df2a62e14fc4a68665259f94c6c0e2" +checksum = "769d6bfa0f70c0e584f46d6bfc78d094f708fa249f2b7b524ab6bed62eb3df36" dependencies = [ "ar", "cargo_toml", @@ -435,9 +482,9 @@ dependencies = [ [[package]] name = "cargo_toml" -version = "0.19.2" +version = "0.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a98356df42a2eb1bd8f1793ae4ee4de48e384dd974ce5eac8eee802edb7492be" +checksum = "c8cb1d556b8b8f36e5ca74938008be3ac102f5dcb5b68a0477e4249ae2291cd3" dependencies = [ "serde", "toml", @@ -477,6 +524,20 @@ name = "cc" version = "1.0.95" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d32a725bc159af97c3e629873bb9f88fb8cf8a4867175f76dc987815ea07c83b" +dependencies = [ + "jobserver", + "libc", + "once_cell", +] + +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom", +] [[package]] name = "cfg-if" @@ -515,6 +576,17 @@ dependencies = [ "inout", ] +[[package]] +name = "clang-sys" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67523a3b4be3ce1989d607a828d036249522dd9c1c8de7f4dd2dae43a37369d1" +dependencies = [ + "glob", + "libc", + "libloading", +] + [[package]] name = "clap" version = "4.5.4" @@ -555,6 +627,15 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce" +[[package]] +name = "cmake" +version = "0.1.50" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a31c789563b815f77f4250caee12365734369f942439b7defd71e18a48197130" +dependencies = [ + "cc", +] + [[package]] name = "colorchoice" version = "1.0.0" @@ -638,6 +719,19 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + [[package]] name = "crossbeam-channel" version = "0.5.12" @@ -666,6 +760,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df0346b5d5e76ac2fe4e327c5fd1118d6be7c51dfb18f9b7922923f287471e35" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.19" @@ -689,7 +792,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" dependencies = [ "cfg-if", - "hashbrown 0.14.3", + "hashbrown 0.14.5", "lock_api", "once_cell", "parking_lot_core", @@ -844,9 +947,9 @@ checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" [[package]] name = "fastrand" -version = "2.0.2" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "658bd65b1cf4c852a3cc96f18a8ce7b5640f6b703f905c7d74532294c2a63984" +checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" [[package]] name = "filetime" @@ -856,7 +959,7 @@ checksum = "1ee447700ac8aa0b2f2bd7bc4462ad686ba06baa6727ac149a2d6277f0d240fd" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.4.1", "windows-sys 0.52.0", ] @@ -874,9 +977,9 @@ checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" [[package]] name = "flate2" -version = "1.0.28" +version = "1.0.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46303f565772937ffe1d394a4fac6f411c6013172fadde9dcdb1e147a086940e" +checksum = "5f54427cfd1c7829e2a139fcefea601bf088ebca651d2bf53ebc600eac295dae" dependencies = [ "crc32fast", "miniz_oxide", @@ -1006,15 +1109,14 @@ dependencies = [ [[package]] name = "gcp-bigquery-client" -version = "0.18.1" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7fe3895eb99784b8ad2776688b41e068e918d18ebb736dafb1a321bce46c749" +checksum = "ebc3e5c4b8a072ca074ab0d4f53dc6b04f45eb9bc0cc046a4a1428c8498af71e" dependencies = [ "async-stream", "async-trait", "dyn-clone", - "hyper 0.14.28", - "hyper-rustls 0.25.0", + "hyper 1.3.1", "log", "reqwest", "serde", @@ -1096,14 +1198,18 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" dependencies = [ - "ahash", + "ahash 0.7.8", ] [[package]] name = "hashbrown" -version = "0.14.3" +version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash 0.8.11", + "allocator-api2", +] [[package]] name = "hdrhistogram" @@ -1384,7 +1490,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" dependencies = [ "equivalent", - "hashbrown 0.14.3", + "hashbrown 0.14.5", ] [[package]] @@ -1427,6 +1533,15 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" +[[package]] +name = "jobserver" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2b099aaa34a9751c5bf0878add70444e1ed2dd73f347be99003d4577277de6e" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.69" @@ -1451,6 +1566,15 @@ dependencies = [ "simple_asn1", ] +[[package]] +name = "keyed_priority_queue" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ee7893dab2e44ae5f9d0173f26ff4aa327c10b01b06a72b52dd9405b628640d" +dependencies = [ + "indexmap 2.2.6", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -1460,11 +1584,27 @@ dependencies = [ "spin 0.5.2", ] +[[package]] +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" + [[package]] name = "libc" -version = "0.2.153" +version = "0.2.154" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" +checksum = "ae743338b92ff9146ce83992f766a31066a91a8c84a45e0e9f21e7cf6de6d346" + +[[package]] +name = "libloading" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19" +dependencies = [ + "cfg-if", + "windows-targets 0.52.5", +] [[package]] name = "libm" @@ -1480,9 +1620,9 @@ checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" [[package]] name = "lock_api" -version = "0.4.11" +version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c168f8615b12bc01f9c17e2eb0cc07dcae1940121185446edc3744920e8ef45" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" dependencies = [ "autocfg", "scopeguard", @@ -1494,6 +1634,15 @@ version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" +[[package]] +name = "lru" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3262e75e648fce39813cb56ac41f3c3e3f65217ebf3844d818d1f9398cfb0dc" +dependencies = [ + "hashbrown 0.14.5", +] + [[package]] name = "lzma-sys" version = "0.1.20" @@ -1570,6 +1719,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" dependencies = [ "libc", + "log", "wasi", "windows-sys 0.48.0", ] @@ -1580,6 +1730,78 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" +[[package]] +name = "mysql_async" +version = "0.34.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbfe87d7e35cb72363326216cc1712b865d8d4f70abf3b2d2e6b251fb6b2f427" +dependencies = [ + "bytes", + "crossbeam", + "flate2", + "futures-core", + "futures-sink", + "futures-util", + "keyed_priority_queue", + "lazy_static", + "lru", + "mio", + "mysql_common", + "once_cell", + "pem", + "percent-encoding", + "pin-project", + "rand", + "rustls 0.22.4", + "rustls-pemfile 2.1.2", + "serde", + "serde_json", + "socket2", + "thiserror", + "tokio", + "tokio-rustls 0.25.0", + "tokio-util", + "twox-hash", + "url", + "webpki", + "webpki-roots", +] + +[[package]] +name = "mysql_common" +version = "0.32.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ccdc1fe2bb3ef97e07ba4397327ed45509a1e2e499e2f8265243879cbc7313c" +dependencies = [ + "base64 0.21.7", + "bindgen", + "bitflags 2.5.0", + "btoi", + "byteorder", + "bytes", + "cc", + "chrono", + "cmake", + "crc32fast", + "flate2", + "lazy_static", + "num-bigint", + "num-traits", + "rand", + "regex", + "rust_decimal", + "saturating", + "serde", + "serde_json", + "sha1", + "sha2", + "smallvec", + "subprocess", + "thiserror", + "uuid", + "zstd", +] + [[package]] name = "nom" version = "7.1.3" @@ -1712,9 +1934,9 @@ checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" [[package]] name = "parking_lot" -version = "0.12.1" +version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +checksum = "7e4af0ca4f6caed20e900d564c242b8e5d4903fdacf31d3daf527b66fe6f42fb" dependencies = [ "lock_api", "parking_lot_core", @@ -1722,15 +1944,15 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.9.9" +version = "0.9.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.5.1", "smallvec", - "windows-targets 0.48.5", + "windows-targets 0.52.5", ] [[package]] @@ -1780,6 +2002,16 @@ dependencies = [ "hmac", ] +[[package]] +name = "peer-ast" +version = "0.1.0" +dependencies = [ + "anyhow", + "pt", + "rust_decimal", + "sqlparser", +] + [[package]] name = "peer-bigquery" version = "0.1.0" @@ -1787,9 +2019,9 @@ dependencies = [ "anyhow", "async-trait", "chrono", - "dashmap", "futures", "gcp-bigquery-client", + "peer-ast", "peer-connections", "peer-cursor", "pgwire", @@ -1803,7 +2035,6 @@ dependencies = [ "tracing", "uuid", "value", - "yup-oauth2", ] [[package]] @@ -1825,10 +2056,37 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "dashmap", "futures", "pgwire", "sqlparser", "tokio", + "tracing", + "value", +] + +[[package]] +name = "peer-mysql" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "chrono", + "futures", + "mysql_async", + "peer-ast", + "peer-connections", + "peer-cursor", + "pgwire", + "pt", + "rust_decimal", + "serde", + "serde_bytes", + "serde_json", + "sqlparser", + "tokio", + "tokio-stream", + "tracing", "value", ] @@ -1869,7 +2127,6 @@ dependencies = [ "base64 0.22.0", "catalog", "chrono", - "dashmap", "futures", "hex", "jsonwebtoken", @@ -1925,6 +2182,7 @@ dependencies = [ "peer-bigquery", "peer-connections", "peer-cursor", + "peer-mysql", "peer-postgres", "peer-snowflake", "peerdb-parser", @@ -1933,7 +2191,7 @@ dependencies = [ "pt", "rand", "serde_json", - "sha256", + "similar", "sqlparser", "time", "tokio", @@ -1980,29 +2238,24 @@ dependencies = [ [[package]] name = "pgwire" -version = "0.19.2" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b3469097db38009e1cb3b92ecb66770aa8b623b97a2aff69afdd8b0dded397d" +checksum = "3770f56e1e8a608c6de40011b9a00c6b669c14d121024411701b4bc3b2a5be99" dependencies = [ "async-trait", - "base64 0.21.7", "bytes", "chrono", "derive-new", "futures", "hex", - "log", "md5", "postgres-types", "rand", "ring", - "stringprep", "thiserror", - "time", "tokio", - "tokio-rustls 0.25.0", + "tokio-rustls 0.26.0", "tokio-util", - "x509-certificate", ] [[package]] @@ -2403,6 +2656,15 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "redox_syscall" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "469052894dcb553421e483e4209ee581a45100d31b4018de03e5a7ad86374a7e" +dependencies = [ + "bitflags 2.5.0", +] + [[package]] name = "refinery" version = "0.8.14" @@ -2629,11 +2891,17 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustix" -version = "0.38.33" +version = "0.38.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3cc72858054fcff6d7dea32df2aeaee6a7c24227366d7ea429aada2f26b16ad" +checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f" dependencies = [ "bitflags 2.5.0", "errno", @@ -2662,6 +2930,7 @@ version = "0.23.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "afabcee0551bd1aa3e18e5adbf2c0544722014b899adb31bd186ec638d3da97e" dependencies = [ + "log", "once_cell", "ring", "rustls-pki-types", @@ -2704,15 +2973,15 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.4.1" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ecd36cc4259e3e4514335c4a138c6b43171a8d61d8f5c9348f9fc7529416f247" +checksum = "beb461507cee2c2ff151784c52762cf4d9ff6a61f3e80968600ed24fa837fa54" [[package]] name = "rustls-webpki" -version = "0.102.2" +version = "0.102.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "faaa0a62740bedb9b2ef5afa303da42764c012f743917351dc9a237ea1663610" +checksum = "f3bce581c0dd41bce533ce695a1437fa16a7ab5ac3ccfa99fe1a620a7885eabf" dependencies = [ "ring", "rustls-pki-types", @@ -2749,6 +3018,12 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "saturating" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ece8e78b2f38ec51c51f5d475df0a7187ba5111b2a28bdc761ee05b075d40a71" + [[package]] name = "schannel" version = "0.1.23" @@ -2815,9 +3090,9 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.198" +version = "1.0.199" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9846a40c979031340571da2545a4e5b7c4163bdae79b301d5f86d03979451fcc" +checksum = "0c9f6e76df036c77cd94996771fb40db98187f096dd0b9af39c6c6e452ba966a" dependencies = [ "serde_derive", ] @@ -2833,9 +3108,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.198" +version = "1.0.199" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e88edab869b01783ba905e7d0153f9fc1a6505a96e4ad3018011eedb838566d9" +checksum = "11bd257a6541e141e42ca6d24ae26f7714887b47e89aa739099104c7e4d3b7fc" dependencies = [ "proc-macro2", "quote", @@ -2875,10 +3150,10 @@ dependencies = [ ] [[package]] -name = "sha2" -version = "0.10.8" +name = "sha1" +version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" dependencies = [ "cfg-if", "cpufeatures", @@ -2886,16 +3161,14 @@ dependencies = [ ] [[package]] -name = "sha256" -version = "1.5.0" +name = "sha2" +version = "0.10.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18278f6a914fa3070aa316493f7d2ddfb9ac86ebc06fa3b83bffda487e9065b0" +checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" dependencies = [ - "async-trait", - "bytes", - "hex", - "sha2", - "tokio", + "cfg-if", + "cpufeatures", + "digest", ] [[package]] @@ -2907,6 +3180,12 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "signal-hook-registry" version = "1.4.2" @@ -2938,6 +3217,12 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a" +[[package]] +name = "similar" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa42c91313f1d05da9b26f267f931cf178d4aba455b4c4622dd7355eb80c6640" + [[package]] name = "simple_asn1" version = "0.6.2" @@ -2979,9 +3264,9 @@ checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" [[package]] name = "socket2" -version = "0.5.6" +version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871" +checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c" dependencies = [ "libc", "windows-sys 0.52.0", @@ -3011,8 +3296,8 @@ dependencies = [ [[package]] name = "sqlparser" -version = "0.41.0" -source = "git+https://github.com/peerdb-io/sqlparser-rs.git?branch=ps-es-parsing#e86bcd9ed3dd9ac7be94569f0ae1fc20c440dafa" +version = "0.45.0" +source = "git+https://github.com/peerdb-io/sqlparser-rs.git?branch=main#d76527bd14ceb5f0b739feb37a8912a9ad7f864c" dependencies = [ "log", "sqlparser_derive", @@ -3020,14 +3305,20 @@ dependencies = [ [[package]] name = "sqlparser_derive" -version = "0.2.1" -source = "git+https://github.com/peerdb-io/sqlparser-rs.git?branch=ps-es-parsing#e86bcd9ed3dd9ac7be94569f0ae1fc20c440dafa" +version = "0.2.2" +source = "git+https://github.com/peerdb-io/sqlparser-rs.git?branch=main#d76527bd14ceb5f0b739feb37a8912a9ad7f864c" dependencies = [ "proc-macro2", "quote", "syn 2.0.60", ] +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "stringprep" version = "0.1.4" @@ -3045,6 +3336,16 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "subprocess" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c2e86926081dda636c546d8c5e641661049d7562a68f5488be4a1f7f66f6086" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "subtle" version = "2.5.0" @@ -3367,7 +3668,7 @@ dependencies = [ "serde", "serde_spanned", "toml_datetime", - "winnow 0.6.6", + "winnow 0.6.7", ] [[package]] @@ -3562,6 +3863,17 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "twox-hash" +version = "1.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" +dependencies = [ + "cfg-if", + "rand", + "static_assertions", +] + [[package]] name = "typed-arena" version = "2.0.2" @@ -3597,9 +3909,9 @@ dependencies = [ [[package]] name = "unicode-width" -version = "0.1.11" +version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e51733f11c9c4f72aa0c160008246859e340b00807569a0da0e7a1079b27ba85" +checksum = "68f5e5f3158ecfd4b8ff6fe086db7c8467a2dfdac97fe420f2b7c4aa97af66d6" [[package]] name = "untrusted" @@ -3609,11 +3921,11 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "ureq" -version = "2.9.6" +version = "2.9.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11f214ce18d8b2cbe84ed3aa6486ed3f5b285cf8d8fbdbce9f3f767a724adc35" +checksum = "d11a831e3c0b56e438a28308e7c810799e3c118417f342d30ecec080105395cd" dependencies = [ - "base64 0.21.7", + "base64 0.22.0", "encoding_rs", "flate2", "log", @@ -3797,6 +4109,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki" +version = "0.22.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed63aea5ce73d0ff405984102c42de94fc55a6b75765d621c65262469b3c9b53" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "webpki-roots" version = "0.26.1" @@ -3812,7 +4134,7 @@ version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a44ab49fad634e88f55bf8f9bb3abd2f27d7204172a112c7c9987e01c1c94ea9" dependencies = [ - "redox_syscall", + "redox_syscall 0.4.1", "wasite", "web-sys", ] @@ -3835,11 +4157,11 @@ checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" [[package]] name = "winapi-util" -version = "0.1.6" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f29e6f9198ba0d26b4c9f07dbe6f9ed633e1f3d5b8b414090084349e46a52596" +checksum = "4d4cc384e1e73b93bafa6fb4f1df8c41695c8a91cf9c4c64358067d15a7b6c6b" dependencies = [ - "winapi", + "windows-sys 0.52.0", ] [[package]] @@ -4007,9 +4329,9 @@ dependencies = [ [[package]] name = "winnow" -version = "0.6.6" +version = "0.6.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0c976aaaa0e1f90dbb21e9587cdaf1d9679a1cde8875c0d6bd83ab96a208352" +checksum = "14b9415ee827af173ebb3f15f9083df5a122eb93572ec28741fb153356ea2578" dependencies = [ "memchr", ] @@ -4063,9 +4385,9 @@ dependencies = [ [[package]] name = "yup-oauth2" -version = "8.3.3" +version = "9.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45b7ff561fdc7809a2adad8bce73e157d01129074098e6405d0d7dfa2d087782" +checksum = "f75463c432f5d4ca9c75047514df3d768f8ac3276ac22c9a6531af6d0a3da7ee" dependencies = [ "anyhow", "async-trait", @@ -4088,6 +4410,26 @@ dependencies = [ "url", ] +[[package]] +name = "zerocopy" +version = "0.7.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74d4d3961e53fa4c9a25a8637fc2bfaf2595b3d3ae34875568a5cf64787716be" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.60", +] + [[package]] name = "zeroize" version = "1.7.0" @@ -4119,3 +4461,31 @@ dependencies = [ "simd-adler32", "typed-arena", ] + +[[package]] +name = "zstd" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d789b1514203a1120ad2429eae43a7bd32b90976a7bb8a05f7ec02fa88cc23a" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "7.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cd99b45c6bc03a018c8b8a86025678c87e55526064e38f9df301989dce7ec0a" +dependencies = [ + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.10+zstd.1.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c253a4914af5bafc8fa8c86ee400827e83cf6ec01195ec1f1ed8441bf00d65aa" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/nexus/Cargo.toml b/nexus/Cargo.toml index 3e4a6a062d..858243453a 100644 --- a/nexus/Cargo.toml +++ b/nexus/Cargo.toml @@ -4,9 +4,11 @@ members = [ "catalog", "flow-rs", "parser", + "peer-ast", "peer-bigquery", "peer-connections", "peer-cursor", + "peer-mysql", "peer-postgres", "peer-snowflake", "postgres-connection", @@ -18,6 +20,16 @@ members = [ resolver = "2" [workspace.dependencies] -chrono = { version = "0.4", default-features = false, features = ["serde", "std"] } -sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git", branch = "ps-es-parsing" } -pgwire = "0.19" +chrono = { version = "0.4", default-features = false, features = [ + "serde", + "std", +] } +dashmap = "5.0" +rust_decimal = { version = "1", default-features = false, features = [ + "tokio-pg", +] } +sqlparser = { git = "https://github.com/peerdb-io/sqlparser-rs.git", branch = "main" } +tracing = "0.1" +pgwire = { version = "0.22", default-features = false, features = [ + "server-api-ring", +] } diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index bf4722e596..0642966713 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -3,7 +3,6 @@ use std::{ collections::{HashMap, HashSet}, ops::ControlFlow, - vec, }; use anyhow::Context; @@ -16,8 +15,11 @@ use pt::{ }, }; use qrep::process_options; -use sqlparser::ast::CreateMirror::{Select, CDC}; -use sqlparser::ast::{visit_relations, visit_statements, FetchDirection, SqlOption, Statement}; +use sqlparser::ast::{ + self, visit_relations, visit_statements, + CreateMirror::{Select, CDC}, + Expr, FetchDirection, SqlOption, Statement, +}; mod qrep; @@ -51,26 +53,38 @@ impl<'a> StatementAnalyzer for PeerExistanceAnalyzer<'a> { fn analyze(&self, statement: &Statement) -> anyhow::Result { let mut peers_touched: HashSet = HashSet::new(); + let mut analyze_name = |name: &str| { + let name = name.to_lowercase(); + if self.peers.contains_key(&name) { + peers_touched.insert(name); + } + }; - // This is necessary as visit relations was not visiting drop table's object names, - // causing DROP commands for Postgres peer being interpreted as - // catalog queries. + // Necessary as visit_relations fails to deeply visit some structures. visit_statements(statement, |stmt| { - if let &Statement::Drop { names, .. } = &stmt { - for name in names { - let peer_name = name.0[0].value.to_lowercase(); - if self.peers.contains_key(&peer_name) { - peers_touched.insert(peer_name); + match stmt { + Statement::Drop { names, .. } => { + for name in names { + analyze_name(&name.0[0].value); + } + } + Statement::Declare { stmts } => { + for stmt in stmts { + if let Some(ref query) = stmt.for_query { + visit_relations(query, |relation| { + analyze_name(&relation.0[0].value); + ControlFlow::<()>::Continue(()) + }); + } } } + _ => (), } ControlFlow::<()>::Continue(()) }); + visit_relations(statement, |relation| { - let peer_name = relation.0[0].value.to_lowercase(); - if self.peers.contains_key(&peer_name) { - peers_touched.insert(peer_name); - } + analyze_name(&relation.0[0].value); ControlFlow::<()>::Continue(()) }); @@ -162,9 +176,10 @@ impl StatementAnalyzer for PeerDDLAnalyzer { } => { match create_mirror { CDC(cdc) => { - let mut flow_job_table_mappings = vec![]; - for table_mapping in &cdc.mapping_options { - flow_job_table_mappings.push(FlowJobTableMapping { + let flow_job_table_mappings = cdc + .mapping_options + .iter() + .map(|table_mapping| FlowJobTableMapping { source_table_identifier: table_mapping.source.to_string(), destination_table_identifier: table_mapping.destination.to_string(), partition_key: table_mapping @@ -176,8 +191,8 @@ impl StatementAnalyzer for PeerDDLAnalyzer { .as_ref() .map(|ss| ss.iter().map(|s| s.value.clone()).collect()) .unwrap_or_default(), - }); - } + }) + .collect::>(); // get do_initial_copy from with_options let mut raw_options = HashMap::with_capacity(cdc.with_options.len()); @@ -185,9 +200,9 @@ impl StatementAnalyzer for PeerDDLAnalyzer { raw_options.insert(&option.name.value as &str, &option.value); } let do_initial_copy = match raw_options.remove("do_initial_copy") { - Some(sqlparser::ast::Value::Boolean(b)) => *b, + Some(Expr::Value(ast::Value::Boolean(b))) => *b, // also support "true" and "false" as strings - Some(sqlparser::ast::Value::SingleQuotedString(s)) => { + Some(Expr::Value(ast::Value::SingleQuotedString(s))) => { match s.as_ref() { "true" => true, "false" => false, @@ -203,9 +218,9 @@ impl StatementAnalyzer for PeerDDLAnalyzer { // bool resync true or false, default to false if not in opts let resync = match raw_options.remove("resync") { - Some(sqlparser::ast::Value::Boolean(b)) => *b, + Some(Expr::Value(ast::Value::Boolean(b))) => *b, // also support "true" and "false" as strings - Some(sqlparser::ast::Value::SingleQuotedString(s)) => { + Some(Expr::Value(ast::Value::SingleQuotedString(s))) => { match s.as_ref() { "true" => true, "false" => false, @@ -218,99 +233,99 @@ impl StatementAnalyzer for PeerDDLAnalyzer { let publication_name: Option = match raw_options .remove("publication_name") { - Some(sqlparser::ast::Value::SingleQuotedString(s)) => Some(s.clone()), + Some(Expr::Value(ast::Value::SingleQuotedString(s))) => Some(s.clone()), _ => None, }; let replication_slot_name: Option = match raw_options .remove("replication_slot_name") { - Some(sqlparser::ast::Value::SingleQuotedString(s)) => Some(s.clone()), + Some(Expr::Value(ast::Value::SingleQuotedString(s))) => Some(s.clone()), _ => None, }; let snapshot_num_rows_per_partition: Option = match raw_options .remove("snapshot_num_rows_per_partition") { - Some(sqlparser::ast::Value::Number(n, _)) => Some(n.parse::()?), + Some(Expr::Value(ast::Value::Number(n, _))) => Some(n.parse::()?), _ => None, }; let snapshot_num_tables_in_parallel: Option = match raw_options .remove("snapshot_num_tables_in_parallel") { - Some(sqlparser::ast::Value::Number(n, _)) => Some(n.parse::()?), + Some(Expr::Value(ast::Value::Number(n, _))) => Some(n.parse::()?), _ => None, }; let snapshot_staging_path = match raw_options.remove("snapshot_staging_path") { - Some(sqlparser::ast::Value::SingleQuotedString(s)) => s.clone(), + Some(Expr::Value(ast::Value::SingleQuotedString(s))) => s.clone(), _ => String::new(), }; let snapshot_max_parallel_workers: Option = match raw_options .remove("snapshot_max_parallel_workers") { - Some(sqlparser::ast::Value::Number(n, _)) => Some(n.parse::()?), + Some(Expr::Value(ast::Value::Number(n, _))) => Some(n.parse::()?), _ => None, }; let cdc_staging_path = match raw_options.remove("cdc_staging_path") { - Some(sqlparser::ast::Value::SingleQuotedString(s)) => Some(s.clone()), + Some(Expr::Value(ast::Value::SingleQuotedString(s))) => Some(s.clone()), _ => Some("".to_string()), }; let soft_delete = match raw_options.remove("soft_delete") { - Some(sqlparser::ast::Value::Boolean(b)) => *b, + Some(Expr::Value(ast::Value::Boolean(b))) => *b, _ => false, }; let push_parallelism: Option = match raw_options .remove("push_parallelism") { - Some(sqlparser::ast::Value::Number(n, _)) => Some(n.parse::()?), + Some(Expr::Value(ast::Value::Number(n, _))) => Some(n.parse::()?), _ => None, }; let push_batch_size: Option = match raw_options .remove("push_batch_size") { - Some(sqlparser::ast::Value::Number(n, _)) => Some(n.parse::()?), + Some(Expr::Value(ast::Value::Number(n, _))) => Some(n.parse::()?), _ => None, }; let max_batch_size: Option = match raw_options.remove("max_batch_size") { - Some(sqlparser::ast::Value::Number(n, _)) => Some(n.parse::()?), + Some(Expr::Value(ast::Value::Number(n, _))) => Some(n.parse::()?), _ => None, }; let soft_delete_col_name: Option = match raw_options .remove("soft_delete_col_name") { - Some(sqlparser::ast::Value::SingleQuotedString(s)) => Some(s.clone()), + Some(Expr::Value(ast::Value::SingleQuotedString(s))) => Some(s.clone()), _ => None, }; let synced_at_col_name: Option = match raw_options .remove("synced_at_col_name") { - Some(sqlparser::ast::Value::SingleQuotedString(s)) => Some(s.clone()), + Some(Expr::Value(ast::Value::SingleQuotedString(s))) => Some(s.clone()), _ => None, }; let initial_copy_only = match raw_options.remove("initial_copy_only") { - Some(sqlparser::ast::Value::Boolean(b)) => *b, + Some(Expr::Value(ast::Value::Boolean(b))) => *b, _ => false, }; let script = match raw_options.remove("script") { - Some(sqlparser::ast::Value::SingleQuotedString(s)) => s.clone(), + Some(Expr::Value(ast::Value::SingleQuotedString(s))) => s.clone(), _ => String::new(), }; let system = match raw_options.remove("system") { - Some(sqlparser::ast::Value::SingleQuotedString(s)) => s.clone(), + Some(Expr::Value(ast::Value::SingleQuotedString(s))) => s.clone(), _ => "Q".to_string(), }; @@ -352,15 +367,15 @@ impl StatementAnalyzer for PeerDDLAnalyzer { Select(select) => { let mut raw_options = HashMap::with_capacity(select.with_options.len()); for option in &select.with_options { - raw_options.insert(&option.name.value as &str, &option.value); + if let Expr::Value(ref value) = option.value { + raw_options.insert(&option.name.value as &str, value); + } } // we treat disabled as a special option, and do not pass it to the // flow server, this is primarily used for external orchestration. let mut disabled = false; - if let Some(sqlparser::ast::Value::Boolean(b)) = - raw_options.remove("disabled") - { + if let Some(ast::Value::Boolean(b)) = raw_options.remove("disabled") { disabled = *b; } @@ -411,7 +426,7 @@ impl StatementAnalyzer for PeerDDLAnalyzer { } let query_string = match raw_options.remove("query_string") { - Some(sqlparser::ast::Value::SingleQuotedString(s)) => Some(s.clone()), + Some(Expr::Value(ast::Value::SingleQuotedString(s))) => Some(s.clone()), _ => None, }; @@ -468,12 +483,14 @@ impl StatementAnalyzer for PeerCursorAnalyzer { name, direction, .. } => { let count = match direction { + FetchDirection::ForwardAll | FetchDirection::All => usize::MAX, + FetchDirection::Next | FetchDirection::Forward { limit: None } => 1, FetchDirection::Count { - limit: sqlparser::ast::Value::Number(n, _), + limit: ast::Value::Number(n, _), } | FetchDirection::Forward { - limit: Some(sqlparser::ast::Value::Number(n, _)), - } => n.parse::(), + limit: Some(ast::Value::Number(n, _)), + } => n.parse::()?, _ => { return Err(anyhow::anyhow!( "invalid fetch direction for cursor: {:?}", @@ -481,11 +498,11 @@ impl StatementAnalyzer for PeerCursorAnalyzer { )) } }; - Ok(Some(CursorEvent::Fetch(name.value.clone(), count?))) + Ok(Some(CursorEvent::Fetch(name.value.clone(), count))) } Statement::Close { cursor } => match cursor { - sqlparser::ast::CloseCursor::All => Ok(Some(CursorEvent::CloseAll)), - sqlparser::ast::CloseCursor::Specific { name } => { + ast::CloseCursor::All => Ok(Some(CursorEvent::CloseAll)), + ast::CloseCursor::Specific { name } => { Ok(Some(CursorEvent::Close(name.to_string()))) } }, @@ -498,15 +515,10 @@ fn parse_db_options(db_type: DbType, with_options: &[SqlOption]) -> anyhow::Resu let mut opts: HashMap<&str, &str> = HashMap::with_capacity(with_options.len()); for opt in with_options { let val = match opt.value { - sqlparser::ast::Value::SingleQuotedString(ref str) => str, - sqlparser::ast::Value::Number(ref v, _) => v, - sqlparser::ast::Value::Boolean(v) => { - if v { - "true" - } else { - "false" - } - } + Expr::Value(ast::Value::SingleQuotedString(ref str)) => str, + Expr::Value(ast::Value::Number(ref v, _)) => v, + Expr::Value(ast::Value::Boolean(true)) => "true", + Expr::Value(ast::Value::Boolean(false)) => "false", _ => panic!("invalid option type for peer"), }; opts.insert(&opt.name.value, val); @@ -903,5 +915,40 @@ fn parse_db_options(db_type: DbType, with_options: &[SqlOption]) -> anyhow::Resu }) } } + DbType::Mysql => Config::MysqlConfig(pt::peerdb_peers::MySqlConfig { + host: opts.get("host").context("no host specified")?.to_string(), + port: opts + .get("port") + .context("no port specified")? + .parse::() + .context("unable to parse port as valid int")?, + user: opts + .get("user") + .cloned() + .unwrap_or_default() + .to_string(), + password: opts + .get("password") + .cloned() + .unwrap_or_default() + .to_string(), + database: opts + .get("database") + .cloned() + .unwrap_or_default() + .to_string(), + setup: opts + .get("setup") + .map(|s| s.split(';').map(String::from).collect::>()) + .unwrap_or_default(), + compression: opts + .get("compression") + .and_then(|s| s.parse::().ok()) + .unwrap_or_default(), + disable_tls: opts + .get("disable_tls") + .and_then(|s| s.parse::().ok()) + .unwrap_or_default(), + }), })) } diff --git a/nexus/analyzer/src/qrep.rs b/nexus/analyzer/src/qrep.rs index 14bdca8713..692b093d94 100644 --- a/nexus/analyzer/src/qrep.rs +++ b/nexus/analyzer/src/qrep.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use serde_json::Value; -use sqlparser::ast::Value as SqlValue; +use sqlparser::ast; enum QRepOptionType { String { @@ -96,7 +96,7 @@ const QREP_OPTIONS: &[QRepOptionType] = &[ ]; pub fn process_options( - mut raw_opts: HashMap<&str, &SqlValue>, + mut raw_opts: HashMap<&str, &ast::Value>, ) -> anyhow::Result> { let mut opts: HashMap = HashMap::new(); @@ -109,7 +109,7 @@ pub fn process_options( accepted_values, } => { if let Some(raw_value) = raw_opts.remove(*name) { - if let SqlValue::SingleQuotedString(str) = raw_value { + if let ast::Value::SingleQuotedString(str) = raw_value { if let Some(values) = accepted_values { if !values.contains(&str.as_str()) { anyhow::bail!("{} must be one of {:?}", name, values); @@ -132,7 +132,7 @@ pub fn process_options( required, } => { if let Some(raw_value) = raw_opts.remove(*name) { - if let SqlValue::Number(num_str, _) = raw_value { + if let ast::Value::Number(num_str, _) = raw_value { let num = num_str.parse::()?; if let Some(min) = min_value { if num < *min { @@ -153,7 +153,7 @@ pub fn process_options( QRepOptionType::StringArray { name } => { // read it as a string and split on comma if let Some(raw_value) = raw_opts.remove(*name) { - if let SqlValue::SingleQuotedString(str) = raw_value { + if let ast::Value::SingleQuotedString(str) = raw_value { let values: Vec = str .split(',') .map(|s| Value::String(s.trim().to_string())) @@ -170,7 +170,7 @@ pub fn process_options( required, } => { if let Some(raw_value) = raw_opts.remove(*name) { - if let SqlValue::Boolean(b) = raw_value { + if let ast::Value::Boolean(b) = raw_value { opts.insert(name.to_string(), Value::Bool(*b)); } else { anyhow::bail!("Invalid value for {}", name); diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index 2c53000759..0c3b65d2b8 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -87,13 +87,12 @@ impl Catalog { pub async fn create_peer(&self, peer: &Peer) -> anyhow::Result { let config_blob = { - let config = peer.config.clone().context("invalid peer config")?; + let config = peer.config.as_ref().context("invalid peer config")?; match config { Config::SnowflakeConfig(snowflake_config) => snowflake_config.encode_to_vec(), Config::BigqueryConfig(bigquery_config) => bigquery_config.encode_to_vec(), Config::MongoConfig(mongo_config) => mongo_config.encode_to_vec(), Config::PostgresConfig(postgres_config) => postgres_config.encode_to_vec(), - Config::EventhubConfig(eventhub_config) => eventhub_config.encode_to_vec(), Config::S3Config(s3_config) => s3_config.encode_to_vec(), Config::SqlserverConfig(sqlserver_config) => sqlserver_config.encode_to_vec(), Config::EventhubGroupConfig(eventhub_group_config) => { @@ -105,6 +104,7 @@ impl Catalog { Config::ElasticsearchConfig(elasticsearch_config) => { elasticsearch_config.encode_to_vec() } + Config::MysqlConfig(mysql_config) => mysql_config.encode_to_vec(), } }; @@ -321,6 +321,11 @@ impl Catalog { pt::peerdb_peers::ElasticsearchConfig::decode(options).with_context(err)?; Config::ElasticsearchConfig(elasticsearch_config) } + DbType::Mysql => { + let mysql_config = + pt::peerdb_peers::MySqlConfig::decode(options).with_context(err)?; + Config::MysqlConfig(mysql_config) + } }) } else { None diff --git a/nexus/flow-rs/Cargo.toml b/nexus/flow-rs/Cargo.toml index c46c0ee842..5ae9cda2d9 100644 --- a/nexus/flow-rs/Cargo.toml +++ b/nexus/flow-rs/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" [dependencies] serde_json = "1.0" anyhow = "1.0" -tracing = "0.1" +tracing.workspace = true tonic-health = "0.11" pt = { path = "../pt" } catalog = { path = "../catalog" } diff --git a/nexus/parser/Cargo.toml b/nexus/parser/Cargo.toml index 8c79ac4a18..b6aac7d88b 100644 --- a/nexus/parser/Cargo.toml +++ b/nexus/parser/Cargo.toml @@ -15,4 +15,4 @@ pt = { path = "../pt" } rand = "0.8" sqlparser.workspace = true tokio = { version = "1", features = ["full"] } -tracing = "0.1" +tracing.workspace = true diff --git a/nexus/parser/src/lib.rs b/nexus/parser/src/lib.rs index deb16af505..4f5cad356a 100644 --- a/nexus/parser/src/lib.rs +++ b/nexus/parser/src/lib.rs @@ -33,6 +33,9 @@ pub enum NexusStatement { stmt: Statement, cursor: CursorEvent, }, + Rollback { + stmt: Statement, + }, Empty, } @@ -41,16 +44,13 @@ impl NexusStatement { peers: HashMap, stmt: &Statement, ) -> PgWireResult { - let ddl = { - let pdl: PeerDDLAnalyzer = PeerDDLAnalyzer; - pdl.analyze(stmt).map_err(|e| { - PgWireError::UserError(Box::new(ErrorInfo::new( - "ERROR".to_owned(), - "internal_error".to_owned(), - e.to_string(), - ))) - }) - }?; + let ddl = PeerDDLAnalyzer.analyze(stmt).map_err(|e| { + PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "internal_error".to_owned(), + e.to_string(), + ))) + })?; if let Some(ddl) = ddl { return Ok(NexusStatement::PeerDDL { @@ -59,8 +59,7 @@ impl NexusStatement { }); } - let peer_cursor: PeerCursorAnalyzer = Default::default(); - if let Ok(Some(cursor)) = peer_cursor.analyze(stmt) { + if let Ok(Some(cursor)) = PeerCursorAnalyzer.analyze(stmt) { return Ok(NexusStatement::PeerCursor { stmt: stmt.clone(), cursor, @@ -126,12 +125,19 @@ impl NexusQueryParser { }) } else { let stmt = stmts.remove(0); - let peers = self.get_peers_bridge().await?; - let nexus_stmt = NexusStatement::new(peers, &stmt)?; - Ok(NexusParsedStatement { - statement: nexus_stmt, - query: sql.to_owned(), - }) + if matches!(stmt, Statement::Rollback { .. }) { + Ok(NexusParsedStatement { + statement: NexusStatement::Rollback { stmt }, + query: sql.to_owned(), + }) + } else { + let peers = self.get_peers_bridge().await?; + let nexus_stmt = NexusStatement::new(peers, &stmt)?; + Ok(NexusParsedStatement { + statement: nexus_stmt, + query: sql.to_owned(), + }) + } } } } diff --git a/nexus/peer-ast/Cargo.toml b/nexus/peer-ast/Cargo.toml new file mode 100644 index 0000000000..91264b6a41 --- /dev/null +++ b/nexus/peer-ast/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "peer-ast" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0" +pt = { path = "../pt" } +rust_decimal.workspace = true +sqlparser.workspace = true diff --git a/nexus/peer-ast/src/lib.rs b/nexus/peer-ast/src/lib.rs new file mode 100644 index 0000000000..cab1728999 --- /dev/null +++ b/nexus/peer-ast/src/lib.rs @@ -0,0 +1,101 @@ +use sqlparser::ast::{Array, ArrayElemTypeDef, DataType, Expr}; + +/// Flatten Cast EXPR to List with right value type +/// For example Value(SingleQuotedString("{hash1,hash2}") must return +/// a vector Value(SingleQuotedString("hash1"), Value(SingleQuotedString("hash2"))) +pub fn flatten_expr_to_in_list(expr: &Expr) -> anyhow::Result> { + let mut list = vec![]; + // check if expr is of type Cast + if let Expr::Cast { + expr, data_type, .. + } = expr + { + // assert that expr is of type SingleQuotedString + if let Expr::Value(sqlparser::ast::Value::SingleQuotedString(s)) = expr.as_ref() { + // trim the starting and ending curly braces + let s = s.trim_start_matches('{').trim_end_matches('}'); + // split string by comma + let split = s.split(','); + // match on data type, and create a vector of Expr::Value + match data_type { + DataType::Array(ArrayElemTypeDef::AngleBracket(inner)) + | DataType::Array(ArrayElemTypeDef::SquareBracket(inner)) => match inner.as_ref() { + DataType::Text | DataType::Char(_) | DataType::Varchar(_) => { + for s in split { + list.push(Expr::Value(sqlparser::ast::Value::SingleQuotedString( + s.to_string(), + ))); + } + } + DataType::Integer(_) + | DataType::Float(_) + | DataType::BigInt(_) + | DataType::UnsignedBigInt(_) + | DataType::UnsignedInteger(_) + | DataType::UnsignedSmallInt(_) + | DataType::UnsignedTinyInt(_) + | DataType::TinyInt(_) + | DataType::UnsignedInt(_) => { + for s in split { + list.push(Expr::Value(sqlparser::ast::Value::Number( + s.to_string(), + false, + ))); + } + } + _ => { + return Err(anyhow::anyhow!( + "Unsupported inner data type for IN list: {:?}", + data_type + )) + } + }, + _ => { + return Err(anyhow::anyhow!( + "Unsupported data type for IN list: {:?}", + data_type + )) + } + } + } else if let Expr::Array(arr) = expr.as_ref() { + list = pour_array_into_list(arr, list).expect("Failed to transfer array to list"); + } + } else if let Expr::Array(arr) = expr { + list = pour_array_into_list(arr, list).expect("Failed to transfer array to list"); + } + + Ok(list) +} + +fn pour_array_into_list(arr: &Array, mut list: Vec) -> anyhow::Result> { + for element in &arr.elem { + match &element { + Expr::Value(val) => match val { + sqlparser::ast::Value::Number(_, _) => { + list.push(Expr::Value(sqlparser::ast::Value::Number( + element.to_string(), + false, + ))); + } + sqlparser::ast::Value::SingleQuotedString(_) => { + list.push(Expr::Value(sqlparser::ast::Value::UnQuotedString( + element.to_string(), + ))); + } + _ => { + return Err(anyhow::anyhow!( + "Unsupported data type for IN list: {:?}", + val + )) + } + }, + _ => { + return Err(anyhow::anyhow!( + "Unsupported element for IN list: {:?}", + element + )) + } + } + } + Ok(list) +} diff --git a/nexus/peer-bigquery/Cargo.toml b/nexus/peer-bigquery/Cargo.toml index 6626bb9840..94670fe4f6 100644 --- a/nexus/peer-bigquery/Cargo.toml +++ b/nexus/peer-bigquery/Cargo.toml @@ -9,20 +9,19 @@ edition = "2021" anyhow = "1.0" async-trait = "0.1" chrono.workspace = true -dashmap = "5.0" futures = { version = "0.3.28", features = ["executor"] } +peer-ast = { path = "../peer-ast" } peer-cursor = { path = "../peer-cursor" } peer-connections = { path = "../peer-connections" } pgwire.workspace = true pt = { path = "../pt" } -rust_decimal = { version = "1.30.0", features = [ "tokio-pg" ] } +rust_decimal.workspace = true serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_bytes = "0.11" sqlparser.workspace = true -tracing = "0.1" +tracing.workspace = true tokio = { version = "1.0", features = ["full"] } -gcp-bigquery-client = "0.18" +gcp-bigquery-client = "0.20" uuid = { version = "1.0", features = ["serde", "v4"] } value = { path = "../value" } -yup-oauth2 = "8.3.3" diff --git a/nexus/peer-bigquery/src/ast.rs b/nexus/peer-bigquery/src/ast.rs index 8429e0ebe1..fd8153382a 100644 --- a/nexus/peer-bigquery/src/ast.rs +++ b/nexus/peer-bigquery/src/ast.rs @@ -1,15 +1,15 @@ use std::ops::ControlFlow; +use peer_ast::flatten_expr_to_in_list; use sqlparser::ast::Value::Number; use sqlparser::ast::{ visit_expressions_mut, visit_function_arg_mut, visit_relations_mut, visit_setexpr_mut, Array, - ArrayElemTypeDef, BinaryOperator, DataType, DateTimeField, Expr, Function, FunctionArg, - FunctionArgExpr, Ident, ObjectName, Query, SetExpr, SetOperator, SetQuantifier, TimezoneInfo, + BinaryOperator, DataType, DateTimeField, Expr, Function, FunctionArg, FunctionArgExpr, Ident, + ObjectName, Query, SetExpr, SetOperator, SetQuantifier, TimezoneInfo, }; -#[derive(Default)] -pub struct BigqueryAst {} +pub struct BigqueryAst; impl BigqueryAst { pub fn is_timestamp_returning_function(&self, name: &str) -> bool { @@ -82,9 +82,8 @@ impl BigqueryAst { .. } = arg_expr { - let list = self - .flatten_expr_to_in_list(arg_expr) - .expect("failed to flatten in function"); + let list = + flatten_expr_to_in_list(arg_expr).expect("failed to flatten in function"); let rewritten_array = Array { elem: list, named: true, @@ -242,9 +241,7 @@ impl BigqueryAst { } = node { if matches!(compare_op, BinaryOperator::Eq | BinaryOperator::NotEq) { - let list = self - .flatten_expr_to_in_list(right) - .expect("failed to flatten"); + let list = flatten_expr_to_in_list(right).expect("failed to flatten"); *node = Expr::InList { expr: left.clone(), list, @@ -258,109 +255,4 @@ impl BigqueryAst { Ok(()) } - - fn pour_array_into_list(&self, arr: &Array, mut list: Vec) -> anyhow::Result> { - for element in &arr.elem { - match &element { - Expr::Value(val) => match val { - sqlparser::ast::Value::Number(_, _) => { - list.push(Expr::Value(sqlparser::ast::Value::Number( - element.to_string(), - false, - ))); - } - sqlparser::ast::Value::SingleQuotedString(_) => { - list.push(Expr::Value(sqlparser::ast::Value::UnQuotedString( - element.to_string(), - ))); - } - _ => { - return Err(anyhow::anyhow!( - "Unsupported data type for IN list: {:?}", - val - )) - } - }, - _ => { - return Err(anyhow::anyhow!( - "Unsupported element for IN list: {:?}", - element - )) - } - } - } - Ok(list) - } - /// Flatten Cast EXPR to List with right value type - /// For example Value(SingleQuotedString("{hash1,hash2}") must return - /// a vector Value(SingleQuotedString("hash1"), Value(SingleQuotedString("hash2"))) - fn flatten_expr_to_in_list(&self, expr: &Expr) -> anyhow::Result> { - let mut list = vec![]; - // check if expr is of type Cast - if let Expr::Cast { - expr, data_type, .. - } = expr - { - // assert that expr is of type SingleQuotedString - if let Expr::Value(sqlparser::ast::Value::SingleQuotedString(s)) = expr.as_ref() { - // trim the starting and ending curly braces - let s = s.trim_start_matches('{').trim_end_matches('}'); - // split string by comma - let split = s.split(','); - // match on data type, and create a vector of Expr::Value - match data_type { - DataType::Array(ArrayElemTypeDef::AngleBracket(inner)) - | DataType::Array(ArrayElemTypeDef::SquareBracket(inner)) => { - match inner.as_ref() { - DataType::Text | DataType::Char(_) | DataType::Varchar(_) => { - for s in split { - list.push(Expr::Value( - sqlparser::ast::Value::SingleQuotedString(s.to_string()), - )); - } - } - DataType::Integer(_) - | DataType::Float(_) - | DataType::BigInt(_) - | DataType::UnsignedBigInt(_) - | DataType::UnsignedInteger(_) - | DataType::UnsignedSmallInt(_) - | DataType::UnsignedTinyInt(_) - | DataType::TinyInt(_) - | DataType::UnsignedInt(_) => { - for s in split { - list.push(Expr::Value(sqlparser::ast::Value::Number( - s.to_string(), - false, - ))); - } - } - _ => { - return Err(anyhow::anyhow!( - "Unsupported inner data type for IN list: {:?}", - data_type - )) - } - } - } - _ => { - return Err(anyhow::anyhow!( - "Unsupported data type for IN list: {:?}", - data_type - )) - } - } - } else if let Expr::Array(arr) = expr.as_ref() { - list = self - .pour_array_into_list(arr, list) - .expect("Failed to transfer array to list"); - } - } else if let Expr::Array(arr) = expr { - list = self - .pour_array_into_list(arr, list) - .expect("Failed to transfer array to list"); - } - - Ok(list) - } } diff --git a/nexus/peer-bigquery/src/lib.rs b/nexus/peer-bigquery/src/lib.rs index 656760c2c8..a7a8fb8f2f 100644 --- a/nexus/peer-bigquery/src/lib.rs +++ b/nexus/peer-bigquery/src/lib.rs @@ -1,20 +1,18 @@ use std::time::Duration; use anyhow::Context; -use cursor::BigQueryCursorManager; use gcp_bigquery_client::{ model::{query_request::QueryRequest, query_response::ResultSet}, - Client, + yup_oauth2, Client, }; use peer_connections::PeerConnectionTracker; -use peer_cursor::{CursorModification, QueryExecutor, QueryOutput, Schema}; +use peer_cursor::{CursorManager, CursorModification, QueryExecutor, QueryOutput, Schema}; use pgwire::error::{ErrorInfo, PgWireError, PgWireResult}; use pt::peerdb_peers::BigqueryConfig; -use sqlparser::ast::{CloseCursor, Expr, FetchDirection, Statement, Value}; +use sqlparser::ast::{CloseCursor, Declare, Expr, FetchDirection, Statement, Value}; use stream::{BqRecordStream, BqSchema}; mod ast; -mod cursor; mod stream; pub struct BigQueryQueryExecutor { @@ -23,7 +21,7 @@ pub struct BigQueryQueryExecutor { dataset_id: String, peer_connections: PeerConnectionTracker, client: Box, - cursor_manager: BigQueryCursorManager, + cursor_manager: CursorManager, } pub async fn bq_client_from_config(config: &BigqueryConfig) -> anyhow::Result { @@ -53,15 +51,14 @@ impl BigQueryQueryExecutor { peer_connections: PeerConnectionTracker, ) -> anyhow::Result { let client = bq_client_from_config(config).await?; - let client = Box::new(client); - let cursor_manager = BigQueryCursorManager::new(); + Ok(Self { peer_name, project_id: config.project_id.clone(), dataset_id: config.dataset_id.clone(), peer_connections, - client, - cursor_manager, + client: Box::new(client), + cursor_manager: Default::default(), }) } @@ -100,8 +97,7 @@ impl QueryExecutor for BigQueryQueryExecutor { match stmt { Statement::Query(query) => { let mut query = query.clone(); - let bq_ast = ast::BigqueryAst::default(); - bq_ast + ast::BigqueryAst .rewrite(&self.dataset_id, &mut query) .context("unable to rewrite query") .map_err(|err| PgWireError::ApiError(err.into()))?; @@ -119,15 +115,31 @@ impl QueryExecutor for BigQueryQueryExecutor { ); Ok(QueryOutput::Stream(Box::pin(cursor))) } - Statement::Declare { name, query, .. } => { - let query_stmt = Statement::Query(query.clone()); - self.cursor_manager - .create_cursor(&name.value, &query_stmt, self) - .await?; - - Ok(QueryOutput::Cursor(CursorModification::Created( - name.value.clone(), - ))) + Statement::Declare { stmts } => { + if stmts.len() != 1 { + Err(PgWireError::ApiError( + "peerdb only supports singular declare statements".into(), + )) + } else if let Declare { + ref names, + for_query: Some(ref query), + .. + } = stmts[0] + { + let name = &names[0]; + let query_stmt = Statement::Query(query.clone()); + self.cursor_manager + .create_cursor(&name.value, &query_stmt, self) + .await?; + + Ok(QueryOutput::Cursor(CursorModification::Created( + name.value.clone(), + ))) + } else { + Err(PgWireError::ApiError( + "peerdb only supports declare for query statements".into(), + )) + } } Statement::Fetch { name, direction, .. @@ -136,12 +148,16 @@ impl QueryExecutor for BigQueryQueryExecutor { // Attempt to extract the count from the direction let count = match direction { + FetchDirection::ForwardAll | FetchDirection::All => usize::MAX, + FetchDirection::Next | FetchDirection::Forward { limit: None } => 1, FetchDirection::Count { limit: sqlparser::ast::Value::Number(n, _), } | FetchDirection::Forward { limit: Some(sqlparser::ast::Value::Number(n, _)), - } => n.parse::(), + } => n + .parse::() + .map_err(|err| PgWireError::ApiError(err.into()))?, _ => { return Err(PgWireError::UserError(Box::new(ErrorInfo::new( "ERROR".to_owned(), @@ -151,12 +167,6 @@ impl QueryExecutor for BigQueryQueryExecutor { } }; - // If parsing the count resulted in an error, return an internal error - let count = match count { - Ok(c) => c, - Err(err) => return Err(PgWireError::ApiError(err.into())), - }; - tracing::info!("fetching {} rows", count); // Fetch rows from the cursor manager @@ -166,14 +176,11 @@ impl QueryExecutor for BigQueryQueryExecutor { Ok(QueryOutput::Records(records)) } Statement::Close { cursor } => { - let mut closed_cursors = vec![]; - match cursor { - CloseCursor::All => { - closed_cursors = self.cursor_manager.close_all_cursors().await?; - } + let closed_cursors = match cursor { + CloseCursor::All => self.cursor_manager.close_all_cursors().await?, CloseCursor::Specific { name } => { self.cursor_manager.close(&name.value).await?; - closed_cursors.push(name.value.clone()); + vec![name.value.clone()] } }; Ok(QueryOutput::Cursor(CursorModification::Closed( @@ -202,8 +209,7 @@ impl QueryExecutor for BigQueryQueryExecutor { match stmt { Statement::Query(query) => { let mut query = query.clone(); - let bq_ast = ast::BigqueryAst::default(); - bq_ast + ast::BigqueryAst .rewrite(&self.dataset_id, &mut query) .context("unable to rewrite query") .map_err(|err| PgWireError::ApiError(err.into()))?; @@ -222,9 +228,23 @@ impl QueryExecutor for BigQueryQueryExecutor { Ok(Some(schema.schema())) } - Statement::Declare { query, .. } => { - let query_stmt = Statement::Query(query.clone()); - self.describe(&query_stmt).await + Statement::Declare { stmts } => { + if stmts.len() != 1 { + Err(PgWireError::ApiError( + "peerdb only supports singular declare statements".into(), + )) + } else if let Declare { + for_query: Some(ref query), + .. + } = stmts[0] + { + let query_stmt = Statement::Query(query.clone()); + self.describe(&query_stmt).await + } else { + Err(PgWireError::ApiError( + "peerdb only supports declare for query statements".into(), + )) + } } _ => PgWireResult::Err(PgWireError::UserError(Box::new(ErrorInfo::new( "ERROR".to_owned(), diff --git a/nexus/peer-bigquery/src/stream.rs b/nexus/peer-bigquery/src/stream.rs index 6019ec82be..d0f7e5b5b7 100644 --- a/nexus/peer-bigquery/src/stream.rs +++ b/nexus/peer-bigquery/src/stream.rs @@ -33,7 +33,7 @@ pub struct BqRecordStream { num_records: usize, } -// covnert FieldType to pgwire FieldInfo's Type +// convert FieldType to pgwire FieldInfo's Type fn convert_field_type(field_type: &FieldType) -> Type { match field_type { FieldType::Bool => Type::BOOL, diff --git a/nexus/peer-connections/Cargo.toml b/nexus/peer-connections/Cargo.toml index b57d7e53d7..78986b3b25 100644 --- a/nexus/peer-connections/Cargo.toml +++ b/nexus/peer-connections/Cargo.toml @@ -15,5 +15,5 @@ tokio-postgres = { version = "0.7.6", features = [ "with-serde_json-1", "with-uuid-1", ] } -tracing = "0.1" +tracing.workspace = true uuid = { version = "1.0" } diff --git a/nexus/peer-cursor/Cargo.toml b/nexus/peer-cursor/Cargo.toml index bead6c92b0..74a2fe9de6 100644 --- a/nexus/peer-cursor/Cargo.toml +++ b/nexus/peer-cursor/Cargo.toml @@ -8,8 +8,10 @@ edition = "2021" [dependencies] anyhow = "1.0" async-trait = "0.1" +dashmap.workspace = true futures = "0.3" pgwire.workspace = true sqlparser.workspace = true tokio = { version = "1.0", features = ["full"] } +tracing.workspace = true value = { path = "../value" } diff --git a/nexus/peer-cursor/src/lib.rs b/nexus/peer-cursor/src/lib.rs index 08a7891f87..3a31531f4f 100644 --- a/nexus/peer-cursor/src/lib.rs +++ b/nexus/peer-cursor/src/lib.rs @@ -5,8 +5,11 @@ use pgwire::{api::results::FieldInfo, error::PgWireResult}; use sqlparser::ast::Statement; use value::Value; +mod manager; pub mod util; +pub use manager::CursorManager; + pub type Schema = Arc>; pub struct Record { @@ -46,3 +49,9 @@ pub trait QueryExecutor: Send + Sync { async fn execute(&self, stmt: &Statement) -> PgWireResult; async fn describe(&self, stmt: &Statement) -> PgWireResult>; } + +pub struct Cursor { + position: usize, + stream: SendableStream, + schema: Schema, +} diff --git a/nexus/peer-bigquery/src/cursor.rs b/nexus/peer-cursor/src/manager.rs similarity index 56% rename from nexus/peer-bigquery/src/cursor.rs rename to nexus/peer-cursor/src/manager.rs index 52558600ef..5384b86362 100644 --- a/nexus/peer-bigquery/src/cursor.rs +++ b/nexus/peer-cursor/src/manager.rs @@ -1,54 +1,37 @@ use dashmap::DashMap; use futures::StreamExt; -use peer_cursor::{QueryExecutor, QueryOutput, Records, Schema, SendableStream}; use pgwire::error::{ErrorInfo, PgWireError, PgWireResult}; use sqlparser::ast::Statement; -use crate::BigQueryQueryExecutor; +use crate::{Cursor, QueryExecutor, QueryOutput, Records}; -pub struct BigQueryCursor { - position: usize, - stream: SendableStream, - schema: Schema, +#[derive(Default)] +pub struct CursorManager { + cursors: DashMap, } -pub struct BigQueryCursorManager { - cursors: DashMap, -} - -impl BigQueryCursorManager { - pub fn new() -> Self { - Self { - cursors: DashMap::new(), - } - } - +impl CursorManager { pub async fn create_cursor( &self, name: &str, stmt: &Statement, - executor: &BigQueryQueryExecutor, + executor: &dyn QueryExecutor, ) -> PgWireResult<()> { - // Execute the query to obtain a stream of records let output = executor.execute(stmt).await?; match output { QueryOutput::Stream(stream) => { - // Get the schema from the stream let schema = stream.schema(); - // Create a new cursor - let cursor = BigQueryCursor { + let cursor = Cursor { position: 0, stream, schema, }; - // Store the cursor self.cursors.insert(name.to_string(), cursor); - // log the cursor and statement tracing::info!("Created cursor {} for statement '{}'", name, stmt); Ok(()) @@ -66,28 +49,23 @@ impl BigQueryCursorManager { PgWireError::UserError(Box::new(ErrorInfo::new( "ERROR".to_owned(), "fdw_error".to_owned(), - format!("[bigquery] Cursor {} does not exist", name), + format!("Cursor {} does not exist", name), ))) })?; let mut records = Vec::new(); - let prev_end = cursor.position; - let mut cursor_position = cursor.position; - { - while cursor_position - prev_end < count { - match cursor.stream.next().await { - Some(Ok(record)) => { - records.push(record); - cursor_position += 1; - tracing::info!("cusror position: {}", cursor_position); - } - Some(Err(err)) => return Err(err), - None => break, + while records.len() < count { + match cursor.stream.next().await { + Some(Ok(record)) => { + records.push(record); } + Some(Err(err)) => return Err(err), + None => break, } } - cursor.position = cursor_position; + tracing::info!("Cursor {} fetched {} records", name, records.len()); + cursor.position += records.len(); Ok(Records { records, @@ -96,8 +74,7 @@ impl BigQueryCursorManager { } pub async fn close(&self, name: &str) -> PgWireResult<()> { - // log that we are removing the cursor from bq - tracing::info!("Removing cursor {} from BigQuery", name); + tracing::info!("Removing cursor {}", name); self.cursors .remove(name) @@ -111,10 +88,8 @@ impl BigQueryCursorManager { .map(|_| ()) } - // close all the cursors pub async fn close_all_cursors(&self) -> PgWireResult> { - // log that we are removing all the cursors from bq - tracing::info!("Removing all cursors from BigQuery"); + tracing::info!("Removing all cursors"); let keys: Vec<_> = self .cursors diff --git a/nexus/peer-mysql/Cargo.toml b/nexus/peer-mysql/Cargo.toml new file mode 100644 index 0000000000..3b28572be4 --- /dev/null +++ b/nexus/peer-mysql/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "peer-mysql" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0" +async-trait = "0.1" +chrono.workspace = true +futures = { version = "0.3.28", features = ["executor"] } +mysql_async = { version = "0.34", default-features = false, features = ["minimal-rust", "rust_decimal", "chrono", "rustls-tls"] } +peer-ast = { path = "../peer-ast" } +peer-cursor = { path = "../peer-cursor" } +peer-connections = { path = "../peer-connections" } +pgwire.workspace = true +pt = { path = "../pt" } +rust_decimal.workspace = true +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +serde_bytes = "0.11" +sqlparser.workspace = true +tracing.workspace = true +tokio = { version = "1.0", features = ["full"] } +tokio-stream = "0.1" +value = { path = "../value" } diff --git a/nexus/peer-mysql/src/ast.rs b/nexus/peer-mysql/src/ast.rs new file mode 100644 index 0000000000..00c12b7dbf --- /dev/null +++ b/nexus/peer-mysql/src/ast.rs @@ -0,0 +1,63 @@ +use std::ops::ControlFlow; + +use peer_ast::flatten_expr_to_in_list; +use sqlparser::ast::{ + visit_expressions_mut, visit_function_arg_mut, visit_relations_mut, Array, BinaryOperator, + DataType, Expr, FunctionArgExpr, Query, +}; + +pub fn rewrite_query(peername: &str, query: &mut Query) { + visit_relations_mut(query, |table| { + // if peer name is first part of table name, remove first part + if peername.eq_ignore_ascii_case(&table.0[0].value) { + table.0.remove(0); + } + ControlFlow::<()>::Continue(()) + }); + + // postgres_fdw sends `limit 1` as `limit 1::bigint` which mysql chokes on + if let Some(Expr::Cast { expr, .. }) = &query.limit { + query.limit = Some((**expr).clone()); + } + + visit_function_arg_mut(query, |node| { + if let FunctionArgExpr::Expr(arg_expr) = node { + if let Expr::Cast { + data_type: DataType::Array(_), + .. + } = arg_expr + { + let list = + flatten_expr_to_in_list(arg_expr).expect("failed to flatten in function"); + let rewritten_array = Array { + elem: list, + named: true, + }; + *node = FunctionArgExpr::Expr(Expr::Array(rewritten_array)); + } + } + + ControlFlow::<()>::Continue(()) + }); + + // flatten ANY to IN operation overall. + visit_expressions_mut(query, |node| { + if let Expr::AnyOp { + left, + compare_op, + right, + } = node + { + if matches!(compare_op, BinaryOperator::Eq | BinaryOperator::NotEq) { + let list = flatten_expr_to_in_list(right).expect("failed to flatten"); + *node = Expr::InList { + expr: left.clone(), + list, + negated: matches!(compare_op, BinaryOperator::NotEq), + }; + } + } + + ControlFlow::<()>::Continue(()) + }); +} diff --git a/nexus/peer-mysql/src/client.rs b/nexus/peer-mysql/src/client.rs new file mode 100644 index 0000000000..7081cc6087 --- /dev/null +++ b/nexus/peer-mysql/src/client.rs @@ -0,0 +1,53 @@ +use std::sync::Arc; + +use futures::StreamExt; +use mysql_async::{self, prelude::Queryable}; +use tokio::{spawn, sync::mpsc}; + +pub enum Response { + Row(mysql_async::Row), + Schema(Arc<[mysql_async::Column]>), + Err(mysql_async::Error), +} + +pub struct Message { + pub query: String, + pub response: mpsc::Sender, +} + +#[derive(Clone)] +pub struct MyClient { + pub chan: mpsc::Sender, +} + +impl MyClient { + pub async fn new(opts: mysql_async::Opts) -> mysql_async::Result { + let mut conn = mysql_async::Conn::new(opts).await?; + let (send, mut recv) = mpsc::channel(1); + spawn(async move { + while let Some(Message { query, response }) = recv.recv().await { + match conn.query_stream(query).await { + Ok(stream) => { + response.send(Response::Schema(stream.columns())).await.ok(); + stream + .for_each_concurrent(1, |row| async { + response + .send(match row { + Ok(row) => Response::Row(row), + Err(err) => Response::Err(err), + }) + .await + .ok(); + }) + .await; + } + Err(e) => { + response.send(Response::Err(e)).await.ok(); + } + } + } + }); + + Ok(MyClient { chan: send }) + } +} diff --git a/nexus/peer-mysql/src/lib.rs b/nexus/peer-mysql/src/lib.rs new file mode 100644 index 0000000000..d57e83b932 --- /dev/null +++ b/nexus/peer-mysql/src/lib.rs @@ -0,0 +1,232 @@ +mod ast; +mod client; +mod stream; + +use std::fmt::Write; + +use peer_cursor::{ + CursorManager, CursorModification, QueryExecutor, QueryOutput, RecordStream, Schema, +}; +use pgwire::error::{ErrorInfo, PgWireError, PgWireResult}; +use pt::peerdb_peers::MySqlConfig; +use sqlparser::ast::{CloseCursor, Declare, FetchDirection, Statement}; +use stream::MyRecordStream; + +pub struct MySqlQueryExecutor { + peer_name: String, + client: client::MyClient, + cursor_manager: CursorManager, +} + +impl MySqlQueryExecutor { + pub async fn new(peer_name: String, config: &MySqlConfig) -> anyhow::Result { + let mut opts = mysql_async::OptsBuilder::default().prefer_socket(Some(false)); // prefer_socket breaks connecting to StarRocks + if !config.user.is_empty() { + opts = opts.user(Some(config.user.clone())) + } + if !config.password.is_empty() { + opts = opts.pass(Some(config.password.clone())) + } + if !config.database.is_empty() { + opts = opts.db_name(Some(config.database.clone())) + } + if !config.disable_tls { + opts = opts.ssl_opts(mysql_async::SslOpts::default()) + } + opts = opts + .setup(config.setup.clone()) + .compression(mysql_async::Compression::new(config.compression)) + .ip_or_hostname(config.host.clone()) + .tcp_port(config.port as u16); + let client = client::MyClient::new(opts.into()).await?; + + Ok(Self { + peer_name, + client, + cursor_manager: Default::default(), + }) + } + + async fn query(&self, query: String) -> PgWireResult { + MyRecordStream::query(self.client.clone(), query).await + } + + async fn query_schema(&self, query: String) -> PgWireResult { + let stream = MyRecordStream::query(self.client.clone(), query).await?; + Ok(stream.schema()) + } +} + +#[async_trait::async_trait] +impl QueryExecutor for MySqlQueryExecutor { + // #[tracing::instrument(skip(self, stmt), fields(stmt = %stmt))] + async fn execute(&self, stmt: &Statement) -> PgWireResult { + // only support SELECT statements + match stmt { + Statement::Explain { + analyze, + format, + statement, + .. + } => { + if let Statement::Query(ref query) = **statement { + let mut query = query.clone(); + ast::rewrite_query(&self.peer_name, &mut query); + let mut querystr = String::from("EXPLAIN "); + if *analyze { + querystr.push_str("ANALYZE "); + } + if let Some(format) = format { + write!(querystr, "FORMAT={} ", format).ok(); + } + write!(querystr, "{}", query).ok(); + tracing::info!("mysql rewritten query: {}", query); + + let cursor = self.query(querystr).await?; + Ok(QueryOutput::Stream(Box::pin(cursor))) + } else { + let error = format!( + "only EXPLAIN SELECT statements are supported in mysql. got: {}", + statement + ); + Err(PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "fdw_error".to_owned(), + error, + )))) + } + } + Statement::Query(query) => { + let mut query = query.clone(); + ast::rewrite_query(&self.peer_name, &mut query); + let query = query.to_string(); + tracing::info!("mysql rewritten query: {}", query); + + let cursor = self.query(query).await?; + Ok(QueryOutput::Stream(Box::pin(cursor))) + } + Statement::Declare { stmts } => { + if stmts.len() != 1 { + Err(PgWireError::ApiError( + "peerdb only supports singular declare statements".into(), + )) + } else if let Declare { + ref names, + for_query: Some(ref query), + .. + } = stmts[0] + { + let name = &names[0]; + let mut query = query.clone(); + ast::rewrite_query(&self.peer_name, &mut query); + let query_stmt = Statement::Query(query); + self.cursor_manager + .create_cursor(&name.value, &query_stmt, self) + .await?; + + Ok(QueryOutput::Cursor(CursorModification::Created( + name.value.clone(), + ))) + } else { + Err(PgWireError::ApiError( + "peerdb only supports declare for query statements".into(), + )) + } + } + Statement::Fetch { + name, direction, .. + } => { + tracing::info!("fetching cursor for mysql: {}", name.value); + + // Attempt to extract the count from the direction + let count = match direction { + FetchDirection::ForwardAll | FetchDirection::All => usize::MAX, + FetchDirection::Next | FetchDirection::Forward { limit: None } => 1, + FetchDirection::Count { + limit: sqlparser::ast::Value::Number(n, _), + } + | FetchDirection::Forward { + limit: Some(sqlparser::ast::Value::Number(n, _)), + } => n + .parse::() + .map_err(|err| PgWireError::ApiError(err.into()))?, + _ => { + return Err(PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "fdw_error".to_owned(), + "only FORWARD count and COUNT count are supported in FETCH".to_owned(), + )))) + } + }; + + tracing::info!("fetching {} rows", count); + + // Fetch rows from the cursor manager + let records = self.cursor_manager.fetch(&name.value, count).await?; + + // Return the fetched records as the query output + Ok(QueryOutput::Records(records)) + } + Statement::Close { cursor } => { + let closed_cursors = match cursor { + CloseCursor::All => self.cursor_manager.close_all_cursors().await?, + CloseCursor::Specific { name } => { + self.cursor_manager.close(&name.value).await?; + vec![name.value.clone()] + } + }; + Ok(QueryOutput::Cursor(CursorModification::Closed( + closed_cursors, + ))) + } + _ => { + let error = format!( + "only SELECT statements are supported in mysql. got: {}", + stmt + ); + Err(PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "fdw_error".to_owned(), + error, + )))) + } + } + } + + // describe the output of the query + async fn describe(&self, stmt: &Statement) -> PgWireResult> { + // print the statement + tracing::info!("[mysql] describe: {}", stmt); + // only support SELECT statements + match stmt { + Statement::Query(query) => { + let mut query = query.clone(); + ast::rewrite_query(&self.peer_name, &mut query); + Ok(Some(self.query_schema(query.to_string()).await?)) + } + Statement::Declare { stmts } => { + if stmts.len() != 1 { + Err(PgWireError::ApiError( + "peerdb only supports singular declare statements".into(), + )) + } else if let Declare { + for_query: Some(ref query), + .. + } = stmts[0] + { + let query_stmt = Statement::Query(query.clone()); + self.describe(&query_stmt).await + } else { + Err(PgWireError::ApiError( + "peerdb only supports declare for query statements".into(), + )) + } + } + _ => PgWireResult::Err(PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "fdw_error".to_owned(), + "only SELECT statements are supported in mysql".to_owned(), + )))), + } + } +} diff --git a/nexus/peer-mysql/src/stream.rs b/nexus/peer-mysql/src/stream.rs new file mode 100644 index 0000000000..13f3c03cf4 --- /dev/null +++ b/nexus/peer-mysql/src/stream.rs @@ -0,0 +1,189 @@ +use std::{ + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; + +use crate::client::{self, MyClient}; +use futures::Stream; +use mysql_async::consts::ColumnType; +use mysql_async::{Column, Row}; +use peer_cursor::{Record, RecordStream, Schema}; +use pgwire::{ + api::{ + results::{FieldFormat, FieldInfo}, + Type, + }, + error::{PgWireError, PgWireResult}, +}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use value::Value; + +pub struct MyRecordStream { + schema: Schema, + stream: ReceiverStream, +} + +// convert ColumnType to pgwire FieldInfo's Type +fn convert_field_type(field_type: ColumnType) -> Type { + match field_type { + ColumnType::MYSQL_TYPE_NULL | ColumnType::MYSQL_TYPE_UNKNOWN => Type::VOID, + ColumnType::MYSQL_TYPE_FLOAT => Type::FLOAT4, + ColumnType::MYSQL_TYPE_DOUBLE => Type::FLOAT8, + ColumnType::MYSQL_TYPE_YEAR => Type::INT2, + ColumnType::MYSQL_TYPE_TINY => Type::INT2, + ColumnType::MYSQL_TYPE_SHORT => Type::INT2, + ColumnType::MYSQL_TYPE_INT24 => Type::INT4, + ColumnType::MYSQL_TYPE_LONG => Type::INT4, + ColumnType::MYSQL_TYPE_LONGLONG => Type::INT8, + ColumnType::MYSQL_TYPE_DECIMAL | ColumnType::MYSQL_TYPE_NEWDECIMAL => Type::NUMERIC, + ColumnType::MYSQL_TYPE_VARCHAR + | ColumnType::MYSQL_TYPE_VAR_STRING + | ColumnType::MYSQL_TYPE_STRING + | ColumnType::MYSQL_TYPE_ENUM + | ColumnType::MYSQL_TYPE_SET => Type::TEXT, + ColumnType::MYSQL_TYPE_TINY_BLOB + | ColumnType::MYSQL_TYPE_MEDIUM_BLOB + | ColumnType::MYSQL_TYPE_LONG_BLOB + | ColumnType::MYSQL_TYPE_BLOB + | ColumnType::MYSQL_TYPE_BIT + | ColumnType::MYSQL_TYPE_GEOMETRY => Type::BYTEA, + ColumnType::MYSQL_TYPE_DATE | ColumnType::MYSQL_TYPE_NEWDATE => Type::DATE, + ColumnType::MYSQL_TYPE_TIME | ColumnType::MYSQL_TYPE_TIME2 => Type::TIME, + ColumnType::MYSQL_TYPE_TIMESTAMP + | ColumnType::MYSQL_TYPE_TIMESTAMP2 + | ColumnType::MYSQL_TYPE_DATETIME + | ColumnType::MYSQL_TYPE_DATETIME2 => Type::TIMESTAMP, + ColumnType::MYSQL_TYPE_JSON => Type::JSONB, + ColumnType::MYSQL_TYPE_TYPED_ARRAY => Type::VOID, + } +} + +pub fn schema_from_columns(columns: &[Column]) -> Schema { + Arc::new( + columns + .iter() + .map(|column| { + let datatype = convert_field_type(column.column_type()); + FieldInfo::new( + column.name_str().into_owned(), + None, + None, + datatype, + FieldFormat::Text, + ) + }) + .collect(), + ) +} + +impl MyRecordStream { + pub async fn query(conn: MyClient, query: String) -> PgWireResult { + let (send, mut recv) = mpsc::channel::(1); + conn.chan + .send(client::Message { + query, + response: send, + }) + .await + .ok(); + + if let Some(first) = recv.recv().await { + match first { + client::Response::Row(..) => panic!("row received without schema"), + client::Response::Schema(schema) => Ok(MyRecordStream { + schema: schema_from_columns(&schema), + stream: ReceiverStream::new(recv), + }), + client::Response::Err(err) => Err(PgWireError::ApiError(err.into())), + } + } else { + Err(PgWireError::InvalidStartupMessage) + } + } +} + +pub fn mysql_row_to_values(row: Row) -> Vec { + use mysql_async::from_value; + let columns = row.columns(); + row.unwrap() + .into_iter() + .zip(columns.iter()) + .map(|(val, col)| { + if val == mysql_async::Value::NULL { + Value::Null + } else { + match col.column_type() { + ColumnType::MYSQL_TYPE_NULL | ColumnType::MYSQL_TYPE_UNKNOWN => Value::Null, + ColumnType::MYSQL_TYPE_TINY => Value::TinyInt(from_value(val)), + ColumnType::MYSQL_TYPE_SHORT | ColumnType::MYSQL_TYPE_YEAR => { + Value::SmallInt(from_value(val)) + } + ColumnType::MYSQL_TYPE_LONG | ColumnType::MYSQL_TYPE_INT24 => { + Value::Integer(from_value(val)) + } + ColumnType::MYSQL_TYPE_LONGLONG => Value::BigInt(from_value(val)), + ColumnType::MYSQL_TYPE_FLOAT => Value::Float(from_value(val)), + ColumnType::MYSQL_TYPE_DOUBLE => Value::Double(from_value(val)), + ColumnType::MYSQL_TYPE_DECIMAL | ColumnType::MYSQL_TYPE_NEWDECIMAL => { + Value::Numeric(from_value(val)) + } + ColumnType::MYSQL_TYPE_VARCHAR + | ColumnType::MYSQL_TYPE_VAR_STRING + | ColumnType::MYSQL_TYPE_STRING + | ColumnType::MYSQL_TYPE_ENUM + | ColumnType::MYSQL_TYPE_SET => Value::Text(from_value(val)), + ColumnType::MYSQL_TYPE_TINY_BLOB + | ColumnType::MYSQL_TYPE_MEDIUM_BLOB + | ColumnType::MYSQL_TYPE_LONG_BLOB + | ColumnType::MYSQL_TYPE_BLOB + | ColumnType::MYSQL_TYPE_BIT + | ColumnType::MYSQL_TYPE_GEOMETRY => { + Value::Binary(from_value::>(val).into()) + } + ColumnType::MYSQL_TYPE_DATE | ColumnType::MYSQL_TYPE_NEWDATE => { + Value::Date(from_value(val)) + } + ColumnType::MYSQL_TYPE_TIME | ColumnType::MYSQL_TYPE_TIME2 => { + Value::Time(from_value(val)) + } + ColumnType::MYSQL_TYPE_TIMESTAMP + | ColumnType::MYSQL_TYPE_TIMESTAMP2 + | ColumnType::MYSQL_TYPE_DATETIME + | ColumnType::MYSQL_TYPE_DATETIME2 => Value::PostgresTimestamp(from_value(val)), + ColumnType::MYSQL_TYPE_JSON => Value::JsonB(from_value(val)), + ColumnType::MYSQL_TYPE_TYPED_ARRAY => Value::Null, + } + } + }) + .collect() +} + +impl Stream for MyRecordStream { + type Item = PgWireResult; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let row_stream = &mut self.stream; + match Pin::new(row_stream).poll_next(cx) { + Poll::Ready(Some(client::Response::Row(row))) => Poll::Ready(Some(Ok(Record { + schema: self.schema.clone(), + values: mysql_row_to_values(row), + }))), + Poll::Ready(Some(client::Response::Schema(..))) => Poll::Ready(Some(Err( + PgWireError::ApiError("second schema received".into()), + ))), + Poll::Ready(Some(client::Response::Err(e))) => { + Poll::Ready(Some(Err(PgWireError::ApiError(e.into())))) + } + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} + +impl RecordStream for MyRecordStream { + fn schema(&self) -> Schema { + self.schema.clone() + } +} diff --git a/nexus/peer-postgres/Cargo.toml b/nexus/peer-postgres/Cargo.toml index d9439e5e01..873baa2673 100644 --- a/nexus/peer-postgres/Cargo.toml +++ b/nexus/peer-postgres/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" [dependencies] anyhow = "1.0" async-trait = "0.1" -rust_decimal = { version = "1.30.0", features = [ "tokio-pg" ] } +rust_decimal.workspace = true bytes = "1.0" chrono.workspace = true futures = "0.3" @@ -28,6 +28,6 @@ tokio-postgres = { version = "0.7.6", features = [ "with-serde_json-1", "with-uuid-1", ] } -tracing = "0.1" +tracing.workspace = true uuid = { version = "1.0", features = ["serde", "v4"] } value = { path = "../value" } diff --git a/nexus/peer-snowflake/Cargo.toml b/nexus/peer-snowflake/Cargo.toml index bbad53cce7..05dc416c5e 100644 --- a/nexus/peer-snowflake/Cargo.toml +++ b/nexus/peer-snowflake/Cargo.toml @@ -12,7 +12,6 @@ async-trait = "0.1.57" base64 = "0.22" catalog = { path = "../catalog" } chrono.workspace = true -dashmap = "5.0" futures = "0.3" hex = "0.4" jsonwebtoken = { version = "9.0", features = ["use_pem"] } @@ -27,6 +26,6 @@ serde_json = "1.0" sha2 = "0.10" sqlparser.workspace = true tokio = { version = "1.21", features = ["full"] } -tracing = "0.1" +tracing.workspace = true ureq = { version = "2", features = ["json", "charset"] } value = { path = "../value" } diff --git a/nexus/peer-snowflake/src/ast.rs b/nexus/peer-snowflake/src/ast.rs index 3dddd577df..0934ec5592 100644 --- a/nexus/peer-snowflake/src/ast.rs +++ b/nexus/peer-snowflake/src/ast.rs @@ -5,8 +5,7 @@ use sqlparser::ast::{ FunctionArg, FunctionArgExpr, Ident, JsonOperator, ObjectName, Query, Statement, TimezoneInfo, }; -#[derive(Default)] -pub struct SnowflakeAst {} +pub struct SnowflakeAst; impl SnowflakeAst { pub fn rewrite(&self, query: &mut Query) -> anyhow::Result<()> { diff --git a/nexus/peer-snowflake/src/cursor.rs b/nexus/peer-snowflake/src/cursor.rs deleted file mode 100644 index 318a6d04d8..0000000000 --- a/nexus/peer-snowflake/src/cursor.rs +++ /dev/null @@ -1,124 +0,0 @@ -use crate::SnowflakeQueryExecutor; -use dashmap::DashMap; -use futures::StreamExt; -use peer_cursor::{QueryExecutor, QueryOutput, Records, Schema, SendableStream}; -use pgwire::error::{ErrorInfo, PgWireError, PgWireResult}; -use sqlparser::ast::Statement; - -pub struct SnowflakeCursor { - position: usize, - stream: SendableStream, - schema: Schema, -} - -pub struct SnowflakeCursorManager { - cursors: DashMap, -} - -impl SnowflakeCursorManager { - pub fn new() -> Self { - Self { - cursors: DashMap::new(), - } - } - pub async fn create_cursor( - &self, - name: &str, - stmt: &Statement, - executor: &SnowflakeQueryExecutor, - ) -> PgWireResult<()> { - // Execute the query to obtain a stream of records - let output = executor.execute(stmt).await?; - - match output { - QueryOutput::Stream(stream) => { - // Get the schema from the stream - let schema = stream.schema(); - - // Create a new cursor - let cursor = SnowflakeCursor { - position: 0, - stream, - schema, - }; - - // Store the cursor - self.cursors.insert(name.to_string(), cursor); - - // log the cursor and statement - tracing::info!("Created cursor {} for statement '{}'", name, stmt); - - Ok(()) - } - _ => Err(PgWireError::UserError(Box::new(ErrorInfo::new( - "ERROR".to_owned(), - "fdw_error".to_owned(), - "Only SELECT queries can be used with cursors".to_owned(), - )))), - } - } - - pub async fn fetch(&self, name: &str, count: usize) -> PgWireResult { - let mut cursor = self.cursors.get_mut(name).ok_or_else(|| { - PgWireError::UserError(Box::new(ErrorInfo::new( - "ERROR".to_owned(), - "fdw_error".to_owned(), - format!("[snowflake] Cursor {} does not exist", name), - ))) - })?; - - let mut records = Vec::new(); - let prev_end = cursor.position; - let mut cursor_position = cursor.position; - { - while cursor_position - prev_end < count { - match cursor.stream.next().await { - Some(Ok(record)) => { - records.push(record); - cursor_position += 1; - tracing::info!("cursor position: {}", cursor_position); - } - Some(Err(err)) => return Err(err), - None => break, - } - } - } - - cursor.position = cursor_position; - - Ok(Records { - records, - schema: cursor.schema.clone(), - }) - } - - pub async fn close(&self, name: &str) -> PgWireResult<()> { - // log that we are removing the cursor from bq - tracing::info!("Removing cursor {} from Snowflake", name); - - self.cursors - .remove(name) - .ok_or_else(|| { - PgWireError::UserError(Box::new(ErrorInfo::new( - "ERROR".to_owned(), - "fdw_error".to_owned(), - format!("Cursor {} does not exist", name), - ))) - }) - .map(|_| ()) - } - - // close all the cursors - pub async fn close_all_cursors(&self) -> PgWireResult> { - // log that we are removing all the cursors from bq - tracing::info!("Removing all cursors from Snowflake"); - - let keys: Vec<_> = self - .cursors - .iter() - .map(|entry| entry.key().clone()) - .collect(); - self.cursors.clear(); - Ok(keys) - } -} diff --git a/nexus/peer-snowflake/src/lib.rs b/nexus/peer-snowflake/src/lib.rs index a940688b7e..d04a369d32 100644 --- a/nexus/peer-snowflake/src/lib.rs +++ b/nexus/peer-snowflake/src/lib.rs @@ -1,7 +1,6 @@ use anyhow::Context; use async_recursion::async_recursion; -use cursor::SnowflakeCursorManager; -use peer_cursor::{CursorModification, QueryExecutor, QueryOutput, Schema}; +use peer_cursor::{CursorManager, CursorModification, QueryExecutor, QueryOutput, Schema}; use pgwire::error::{ErrorInfo, PgWireError, PgWireResult}; use std::cmp::min; use std::time::Duration; @@ -12,7 +11,7 @@ use pt::peerdb_peers::SnowflakeConfig; use reqwest::{header, StatusCode}; use secrecy::ExposeSecret; use serde::{Deserialize, Serialize}; -use sqlparser::ast::{CloseCursor, FetchDirection, Query, Statement}; +use sqlparser::ast::{CloseCursor, Declare, FetchDirection, Query, Statement}; use tokio::time::sleep; use tracing::info; @@ -20,7 +19,6 @@ use crate::stream::SnowflakeSchema; mod ast; mod auth; -mod cursor; mod stream; const DEFAULT_REFRESH_THRESHOLD: u64 = 3000; @@ -101,7 +99,7 @@ pub struct SnowflakeQueryExecutor { auth: SnowflakeAuth, query_timeout: u64, reqwest_client: reqwest::Client, - cursor_manager: SnowflakeCursorManager, + cursor_manager: CursorManager, } enum QueryAttemptResult { @@ -127,7 +125,7 @@ impl SnowflakeQueryExecutor { .gzip(true) .default_headers(default_headers) .build()?; - let cursor_manager = SnowflakeCursorManager::new(); + Ok(Self { config: config.clone(), partition_number: 0, @@ -146,7 +144,7 @@ impl SnowflakeQueryExecutor { )?, query_timeout: config.query_timeout, reqwest_client, - cursor_manager, + cursor_manager: Default::default(), }) } @@ -200,8 +198,7 @@ impl SnowflakeQueryExecutor { pub async fn query(&self, query: &Query) -> PgWireResult { let mut query = query.clone(); - let ast = ast::SnowflakeAst::default(); - let _ = ast.rewrite(&mut query); + let _ = ast::SnowflakeAst.rewrite(&mut query); let query_str: String = query.to_string(); info!("Processing SnowFlake query: {}", query_str); @@ -301,8 +298,7 @@ impl QueryExecutor for SnowflakeQueryExecutor { Statement::Query(query) => { let mut new_query = query.clone(); - let snowflake_ast = ast::SnowflakeAst::default(); - snowflake_ast + ast::SnowflakeAst .rewrite(&mut new_query) .context("unable to rewrite query") .map_err(|err| PgWireError::ApiError(err.into()))?; @@ -318,15 +314,31 @@ impl QueryExecutor for SnowflakeQueryExecutor { ); Ok(QueryOutput::Stream(Box::pin(cursor))) } - Statement::Declare { name, query, .. } => { - let query_stmt = Statement::Query(query.clone()); - self.cursor_manager - .create_cursor(&name.value, &query_stmt, self) - .await?; - - Ok(QueryOutput::Cursor(CursorModification::Created( - name.value.clone(), - ))) + Statement::Declare { stmts } => { + if stmts.len() != 1 { + Err(PgWireError::ApiError( + "peerdb only supports singular declare statements".into(), + )) + } else if let Declare { + ref names, + for_query: Some(ref query), + .. + } = stmts[0] + { + let name = &names[0]; + let query_stmt = Statement::Query(query.clone()); + self.cursor_manager + .create_cursor(&name.value, &query_stmt, self) + .await?; + + Ok(QueryOutput::Cursor(CursorModification::Created( + name.value.clone(), + ))) + } else { + Err(PgWireError::ApiError( + "peerdb only supports declare for query statements".into(), + )) + } } Statement::Fetch { name, direction, .. @@ -335,12 +347,16 @@ impl QueryExecutor for SnowflakeQueryExecutor { // Attempt to extract the count from the direction let count = match direction { + FetchDirection::ForwardAll | FetchDirection::All => usize::MAX, + FetchDirection::Next | FetchDirection::Forward { limit: None } => 1, FetchDirection::Count { limit: sqlparser::ast::Value::Number(n, _), } | FetchDirection::Forward { limit: Some(sqlparser::ast::Value::Number(n, _)), - } => n.parse::(), + } => n + .parse::() + .map_err(|err| PgWireError::ApiError(err.into()))?, _ => { return Err(PgWireError::UserError(Box::new(ErrorInfo::new( "ERROR".to_owned(), @@ -350,12 +366,6 @@ impl QueryExecutor for SnowflakeQueryExecutor { } }; - // If parsing the count resulted in an error, return an internal error - let count = match count { - Ok(c) => c, - Err(err) => return Err(PgWireError::ApiError(err.into())), - }; - tracing::info!("fetching {} rows", count); // Fetch rows from the cursor manager @@ -365,14 +375,11 @@ impl QueryExecutor for SnowflakeQueryExecutor { Ok(QueryOutput::Records(records)) } Statement::Close { cursor } => { - let mut closed_cursors = vec![]; - match cursor { - CloseCursor::All => { - closed_cursors = self.cursor_manager.close_all_cursors().await?; - } + let closed_cursors = match cursor { + CloseCursor::All => self.cursor_manager.close_all_cursors().await?, CloseCursor::Specific { name } => { self.cursor_manager.close(&name.value).await?; - closed_cursors.push(name.value.clone()); + vec![name.value.clone()] } }; Ok(QueryOutput::Cursor(CursorModification::Closed( @@ -397,22 +404,33 @@ impl QueryExecutor for SnowflakeQueryExecutor { match stmt { Statement::Query(query) => { let mut new_query = query.clone(); - let sf_ast = ast::SnowflakeAst::default(); - sf_ast + ast::SnowflakeAst .rewrite(&mut new_query) .context("unable to rewrite query") .map_err(|err| PgWireError::ApiError(err.into()))?; - // new_query.limit = Some(Expr::Value(Value::Number("1".to_owned(), false))); - let result_set = self.query(&new_query).await?; let schema = SnowflakeSchema::from_result_set(&result_set); Ok(Some(schema.schema())) } - Statement::Declare { query, .. } => { - let query_stmt = Statement::Query(query.clone()); - self.describe(&query_stmt).await + Statement::Declare { stmts } => { + if stmts.len() != 1 { + Err(PgWireError::ApiError( + "peerdb only supports singular declare statements".into(), + )) + } else if let Declare { + for_query: Some(ref query), + .. + } = stmts[0] + { + let query_stmt = Statement::Query(query.clone()); + self.describe(&query_stmt).await + } else { + Err(PgWireError::ApiError( + "peerdb only supports declare for query statements".into(), + )) + } } _ => PgWireResult::Err(PgWireError::UserError(Box::new(ErrorInfo::new( "ERROR".to_owned(), diff --git a/nexus/postgres-connection/Cargo.toml b/nexus/postgres-connection/Cargo.toml index e4df6aa275..cb86472016 100644 --- a/nexus/postgres-connection/Cargo.toml +++ b/nexus/postgres-connection/Cargo.toml @@ -13,4 +13,4 @@ urlencoding = "2" tokio-postgres = "0.7.2" tokio-postgres-rustls = "0.12" tokio = { version = "1", features = ["full"] } -tracing = "0.1" +tracing.workspace = true diff --git a/nexus/pt/src/lib.rs b/nexus/pt/src/lib.rs index 23487c80b3..137cb3909f 100644 --- a/nexus/pt/src/lib.rs +++ b/nexus/pt/src/lib.rs @@ -26,6 +26,7 @@ impl From for DbType { PeerType::Postgres => DbType::Postgres, PeerType::S3 => DbType::S3, PeerType::SQLServer => DbType::Sqlserver, + PeerType::MySql => DbType::Mysql, PeerType::Kafka => DbType::Kafka, PeerType::Eventhubs => DbType::Eventhubs, PeerType::PubSub => DbType::Pubsub, diff --git a/nexus/server/Cargo.toml b/nexus/server/Cargo.toml index 8c87a2480d..fc12071bc4 100644 --- a/nexus/server/Cargo.toml +++ b/nexus/server/Cargo.toml @@ -37,13 +37,14 @@ bytes = "1.0" catalog = { path = "../catalog" } clap = { version = "4.0", features = ["derive", "env"] } console-subscriber = "0.2" -dashmap = "5.0" +dashmap.workspace = true dotenvy = "0.15.7" flow-rs = { path = "../flow-rs" } futures = { version = "0.3.28", features = ["executor"] } peer-bigquery = { path = "../peer-bigquery" } peer-connections = { path = "../peer-connections" } peer-cursor = { path = "../peer-cursor" } +peer-mysql = { path = "../peer-mysql" } peer-postgres = { path = "../peer-postgres" } peer-snowflake = { path = "../peer-snowflake" } peerdb-parser = { path = "../parser" } @@ -54,7 +55,7 @@ serde_json = "1.0" rand = "0.8" time = "0.3" tokio = { version = "1", features = ["full"] } -tracing = "0.1" +tracing.workspace = true tracing-appender = "0.2" tracing-subscriber = "0.3" uuid = "1.0" @@ -62,4 +63,4 @@ cargo-deb = "2.0" [dev-dependencies] postgres = "0.19.4" -sha256 = "1.0.3" +similar = "2" diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index d5cb4c0dcc..51d2999890 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -14,7 +14,6 @@ use cursor::PeerCursors; use dashmap::{mapref::entry::Entry as DashEntry, DashMap}; use flow_rs::grpc::{FlowGrpcClient, PeerValidationResult}; use futures::join; -use peer_bigquery::BigQueryQueryExecutor; use peer_connections::{PeerConnectionTracker, PeerConnections}; use peer_cursor::{ util::{records_to_query_response, sendable_stream_to_query_response}, @@ -27,9 +26,12 @@ use pgwire::{ md5pass::{hash_md5_password, MakeMd5PasswordAuthStartupHandler}, AuthSource, LoginInfo, Password, ServerParameterProvider, }, - portal::{Format, Portal}, - query::{ExtendedQueryHandler, SimpleQueryHandler, StatementOrPortal}, - results::{DescribeResponse, Response, Tag}, + portal::Portal, + query::{ExtendedQueryHandler, SimpleQueryHandler}, + results::{ + DescribePortalResponse, DescribeResponse, DescribeStatementResponse, Response, Tag, + }, + stmt::StoredStatement, ClientInfo, MakeHandler, Type, }, error::{ErrorInfo, PgWireError, PgWireResult}, @@ -67,10 +69,7 @@ impl AuthSource for FixedPasswordAuthSource { let salt = rand::thread_rng().gen::<[u8; 4]>(); let password = &self.password; let hash_password = hash_md5_password(login_info.user().unwrap_or(""), password, &salt); - Ok(Password::new( - Some(salt.to_vec()), - hash_password.as_bytes().to_vec(), - )) + Ok(Password::new(Some(salt.to_vec()), Vec::from(hash_password))) } } @@ -125,7 +124,7 @@ impl NexusBackend { Ok(vec![res]) } QueryOutput::Cursor(cm) => { - tracing::info!("cursor modification: {:?}", cm); + tracing::info!("cursor modification: {:?} {}", cm, peer_holder.is_some()); let mut peer_cursors = self.peer_cursors.lock().await; match cm { peer_cursor::CursorModification::Created(cursor_name) => { @@ -145,13 +144,6 @@ impl NexusBackend { } } - fn is_peer_validity_supported(peer_type: i32) -> bool { - let unsupported_peer_types = [ - 11, // EVENTHUBS - ]; - !unsupported_peer_types.contains(&peer_type) - } - async fn check_for_mirror( catalog: &Catalog, flow_name: &str, @@ -192,10 +184,14 @@ impl NexusBackend { async fn validate_peer<'a>(&self, peer: &Peer) -> anyhow::Result<()> { //if flow handler does not exist, skip validation - if self.flow_handler.is_none() { + let mut flow_handler = if let Some(ref flow_handler) = self.flow_handler { + flow_handler.as_ref() + } else { return Ok(()); } - let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; + .lock() + .await; + let validate_request = pt::peerdb_route::ValidatePeerRequest { peer: Some(Peer { name: peer.name.clone(), @@ -361,16 +357,13 @@ impl NexusBackend { peer, if_not_exists: _, } => { - let peer_type = peer.r#type; - if Self::is_peer_validity_supported(peer_type) { - self.validate_peer(peer).await.map_err(|e| { - PgWireError::UserError(Box::new(ErrorInfo::new( - "ERROR".to_owned(), - "internal_error".to_owned(), - e.to_string(), - ))) - })?; - } + self.validate_peer(peer).await.map_err(|e| { + PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "internal_error".to_owned(), + e.to_string(), + ))) + })?; self.catalog.create_peer(peer.as_ref()).await.map_err(|e| { PgWireError::UserError(Box::new(ErrorInfo::new( @@ -787,6 +780,11 @@ impl NexusBackend { self.execute_statement(executor.as_ref(), &stmt, None).await } + NexusStatement::Rollback { stmt } => { + self.execute_statement(self.catalog.as_ref(), &stmt, None) + .await + } + NexusStatement::Empty => Ok(vec![Response::EmptyQuery]), } } @@ -832,7 +830,7 @@ impl NexusBackend { DashEntry::Vacant(entry) => { let executor: Arc = match &peer.config { Some(Config::BigqueryConfig(ref c)) => { - let executor = BigQueryQueryExecutor::new( + let executor = peer_bigquery::BigQueryQueryExecutor::new( peer.name.clone(), c, self.peer_connections.clone(), @@ -840,6 +838,11 @@ impl NexusBackend { .await?; Arc::new(executor) } + Some(Config::MysqlConfig(ref c)) => { + let executor = + peer_mysql::MySqlQueryExecutor::new(peer.name.clone(), c).await?; + Arc::new(executor) + } Some(Config::PostgresConfig(ref c)) => { let executor = peer_postgres::PostgresQueryExecutor::new(peer.name.clone(), c).await?; @@ -859,6 +862,61 @@ impl NexusBackend { } }) } + + async fn do_describe(&self, stmt: &NexusParsedStatement) -> PgWireResult> { + tracing::info!("[eqp] do_describe: {}", stmt.query); + let stmt = &stmt.statement; + match stmt { + NexusStatement::PeerDDL { .. } => Ok(None), + NexusStatement::PeerCursor { .. } => Ok(None), + NexusStatement::Empty => Ok(None), + NexusStatement::Rollback { .. } => Ok(None), + NexusStatement::PeerQuery { stmt, assoc } => { + let schema: Option = match assoc { + QueryAssociation::Peer(peer) => match &peer.config { + Some(Config::BigqueryConfig(_)) => { + let executor = self.get_peer_executor(peer).await.map_err(|err| { + PgWireError::ApiError( + format!("unable to get peer executor: {:?}", err).into(), + ) + })?; + executor.describe(stmt).await? + } + Some(Config::MysqlConfig(_)) => { + let executor = self.get_peer_executor(peer).await.map_err(|err| { + PgWireError::ApiError( + format!("unable to get peer executor: {:?}", err).into(), + ) + })?; + executor.describe(stmt).await? + } + Some(Config::PostgresConfig(_)) => { + let executor = self.get_peer_executor(peer).await.map_err(|err| { + PgWireError::ApiError( + format!("unable to get peer executor: {:?}", err).into(), + ) + })?; + executor.describe(stmt).await? + } + Some(Config::SnowflakeConfig(_)) => { + let executor = self.get_peer_executor(peer).await.map_err(|err| { + PgWireError::ApiError( + format!("unable to get peer executor: {:?}", err).into(), + ) + })?; + executor.describe(stmt).await? + } + _ => { + panic!("peer type not supported: {:?}", peer) + } + }, + QueryAssociation::Catalog => self.catalog.describe(stmt).await?, + }; + + Ok(if self.peerdb_fdw_mode { None } else { schema }) + } + } + } } #[async_trait] @@ -955,91 +1013,38 @@ impl ExtendedQueryHandler for NexusBackend { } } - async fn do_describe( + async fn do_describe_portal( &self, _client: &mut C, - target: StatementOrPortal<'_, Self::Statement>, - ) -> PgWireResult + target: &Portal, + ) -> PgWireResult where C: ClientInfo + Unpin + Send + Sync, { - let (param_types, stmt, _format) = match target { - StatementOrPortal::Statement(stmt) => { - let param_types = Some(&stmt.parameter_types); - (param_types, &stmt.statement, &Format::UnifiedBinary) - } - StatementOrPortal::Portal(portal) => ( - None, - &portal.statement.statement, - &portal.result_column_format, - ), - }; + Ok( + if let Some(schema) = self.do_describe(&target.statement.statement).await? { + DescribePortalResponse::new((*schema).clone()) + } else { + DescribePortalResponse::no_data() + }, + ) + } - tracing::info!("[eqp] do_describe: {}", stmt.query); - let stmt = &stmt.statement; - match stmt { - NexusStatement::PeerDDL { .. } => Ok(DescribeResponse::no_data()), - NexusStatement::PeerCursor { .. } => Ok(DescribeResponse::no_data()), - NexusStatement::Empty => Ok(DescribeResponse::no_data()), - NexusStatement::PeerQuery { stmt, assoc } => { - let schema: Option = match assoc { - QueryAssociation::Peer(peer) => { - // if the peer is of type bigquery, let us route the query to bq. - match &peer.config { - Some(Config::BigqueryConfig(_)) => { - let executor = - self.get_peer_executor(peer).await.map_err(|err| { - PgWireError::ApiError( - format!("unable to get peer executor: {:?}", err) - .into(), - ) - })?; - executor.describe(stmt).await? - } - Some(Config::PostgresConfig(_)) => { - let executor = - self.get_peer_executor(peer).await.map_err(|err| { - PgWireError::ApiError( - format!("unable to get peer executor: {:?}", err) - .into(), - ) - })?; - executor.describe(stmt).await? - } - Some(Config::SnowflakeConfig(_)) => { - let executor = - self.get_peer_executor(peer).await.map_err(|err| { - PgWireError::ApiError( - format!("unable to get peer executor: {:?}", err) - .into(), - ) - })?; - executor.describe(stmt).await? - } - Some(_peer) => { - panic!("peer type not supported: {:?}", peer) - } - None => { - panic!("peer type not supported: {:?}", peer) - } - } - } - QueryAssociation::Catalog => self.catalog.describe(stmt).await?, - }; - if let Some(described_schema) = schema { - if self.peerdb_fdw_mode { - Ok(DescribeResponse::no_data()) - } else { - Ok(DescribeResponse::new( - param_types.cloned(), - (*described_schema).clone(), - )) - } - } else { - Ok(DescribeResponse::no_data()) - } - } - } + async fn do_describe_statement( + &self, + _client: &mut C, + target: &StoredStatement, + ) -> PgWireResult + where + C: ClientInfo + Unpin + Send + Sync, + { + Ok( + if let Some(schema) = self.do_describe(&target.statement).await? { + DescribeStatementResponse::new(target.parameter_types.clone(), (*schema).clone()) + } else { + DescribeStatementResponse::no_data() + }, + ) } } diff --git a/nexus/server/tests/results/expected/bq.sql.out b/nexus/server/tests/results/expected/bq.sql.out index 2c186752a4..914bfd8ec9 100644 --- a/nexus/server/tests/results/expected/bq.sql.out +++ b/nexus/server/tests/results/expected/bq.sql.out @@ -151,7 +151,7 @@ test 1 2 2022-06-22 23:35:53.000000 -true +t 12 1 1 diff --git a/nexus/server/tests/results/expected/postgres.sql.out b/nexus/server/tests/results/expected/postgres.sql.out index 7f13da52f5..ab820deeb0 100644 --- a/nexus/server/tests/results/expected/postgres.sql.out +++ b/nexus/server/tests/results/expected/postgres.sql.out @@ -5,8 +5,8 @@ 4294967295 281474976710656 1 -false -true +f +t 2005-10-10 2005-10-10 00:00:00.000000 2005-10-11 00:00:00.000000 @@ -51,17 +51,17 @@ deadbeef 255.255.255.0 192.168.0.0/24 17 -true +t 26 17 -true +t 26 24 -true +t 26 24 -true +t 26 1 -true +t 1 diff --git a/nexus/server/tests/server_test.rs b/nexus/server/tests/server_test.rs index 7738e0115c..49ec206e93 100644 --- a/nexus/server/tests/server_test.rs +++ b/nexus/server/tests/server_test.rs @@ -1,4 +1,3 @@ -use postgres::{Client, NoTls, SimpleQueryMessage}; use std::{ fs::{read_dir, File}, io::{prelude::*, BufReader, Write}, @@ -7,7 +6,12 @@ use std::{ thread, time::Duration, }; + +use postgres::{Client, NoTls, SimpleQueryMessage}; +use similar::TextDiff; + mod create_peers; + fn input_files() -> Vec { let sql_directory = read_dir("tests/sql").unwrap(); sql_directory @@ -16,8 +20,9 @@ fn input_files() -> Vec { sql_file .path() .file_name() - .and_then(|n| n.to_str().map(String::from)) + .and_then(|n| n.to_str()) .filter(|n| n.ends_with(".sql")) + .map(String::from) }) }) .collect::>() @@ -112,8 +117,8 @@ fn server_test() { let expected_output_path = ["tests/results/expected/", file, ".out"].concat(); let mut output_file = File::create(["tests/results/actual/", file, ".out"].concat()) .expect("Unable to create result file"); + for query in queries { - let mut output = Vec::new(); dbg!(query.as_str()); // filter out comments and empty lines @@ -132,59 +137,43 @@ fn server_test() { match res[0] { // Fetch column names for the output SimpleQueryMessage::Row(ref simplerow) => { - for column in simplerow.columns() { - column_names.push(column.name()); - } + column_names.extend(simplerow.columns().iter().map(|column| column.name())); } SimpleQueryMessage::CommandComplete(_x) => (), _ => (), }; - res.iter().for_each(|row| { - for column_head in &column_names { - let row_parse = match row { - SimpleQueryMessage::Row(ref simplerow) => simplerow.get(column_head), - SimpleQueryMessage::CommandComplete(_x) => None, - _ => None, - }; - - let row_value = match row_parse { - None => { - continue; + let mut output = String::new(); + for row in res { + if let SimpleQueryMessage::Row(simplerow) = row { + for idx in 0..simplerow.len() { + if let Some(val) = simplerow.get(idx) { + output.push_str(val); + output.push('\n'); } - Some(x) => x, - }; - output.push(row_value.to_owned()); + } } - }); - - for i in &output { - let output_line = (*i).as_bytes(); - output_file - .write_all(output_line) - .expect("Unable to write query output"); - output_file - .write_all("\n".as_bytes()) - .expect("Output file write failure"); } + output_file + .write_all(output.as_bytes()) + .expect("Unable to write query output"); // flush the output file output_file.flush().expect("Unable to flush output file"); } - // Compare hash of expected and obtained files - let obtained_file = std::fs::read(&actual_output_path).unwrap(); - let expected_file = std::fs::read(&expected_output_path).unwrap(); - let obtained_hash = sha256::digest(obtained_file.as_slice()); - let expected_hash = sha256::digest(expected_file.as_slice()); - + let obtained_file = std::fs::read_to_string(&actual_output_path).unwrap(); + let expected_file = std::fs::read_to_string(&expected_output_path).unwrap(); // if there is a mismatch, print the diff, along with the path. - if obtained_hash != expected_hash { - tracing::info!("expected: {expected_output_path}"); - tracing::info!("obtained: {actual_output_path}"); - } + if obtained_file != expected_file { + tracing::info!("failed: {file}"); + let diff = TextDiff::from_lines(&expected_file, &obtained_file); + for change in diff.iter_all_changes() { + print!("{}{}", change.tag(), change); + } - assert_eq!(obtained_hash, expected_hash); + panic!("result didn't match expected output"); + } }); } diff --git a/nexus/value/Cargo.toml b/nexus/value/Cargo.toml index 2d41188e76..4071b1dea2 100644 --- a/nexus/value/Cargo.toml +++ b/nexus/value/Cargo.toml @@ -9,7 +9,6 @@ edition = "2021" base64 = "0.22" bytes = "1.1" chrono.workspace = true -rust_decimal = { version = "1.30.0", features = [ "tokio-pg" ] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" hex = "0.4" @@ -17,4 +16,5 @@ pgwire.workspace = true postgres = { version = "0.19", features = ["with-chrono-0_4"] } postgres-inet = "0.19.0" postgres-types = { version = "0.2.5", features = ["array-impls"] } +rust_decimal.workspace = true uuid = { version = "1.0", features = ["serde", "v4"] } diff --git a/protos/peers.proto b/protos/peers.proto index 69d748c5a4..13cabf58cf 100644 --- a/protos/peers.proto +++ b/protos/peers.proto @@ -126,6 +126,17 @@ message SqlServerConfig { string database = 5; } +message MySqlConfig { + string host = 1; + uint32 port = 2; + string user = 3; + string password = 4; + string database = 5; + repeated string setup = 6; + uint32 compression = 7; + bool disable_tls = 8; +} + message KafkaConfig { repeated string servers = 1; string username = 2; @@ -158,6 +169,7 @@ enum DBType { POSTGRES = 3; S3 = 5; SQLSERVER = 6; + MYSQL = 7; CLICKHOUSE = 8; KAFKA = 9; PUBSUB = 10; @@ -173,7 +185,6 @@ message Peer { BigqueryConfig bigquery_config = 4; MongoConfig mongo_config = 5; PostgresConfig postgres_config = 6; - EventHubConfig eventhub_config = 7; S3Config s3_config = 8; SqlServerConfig sqlserver_config = 9; EventHubGroupConfig eventhub_group_config = 10; @@ -181,5 +192,6 @@ message Peer { KafkaConfig kafka_config = 12; PubSubConfig pubsub_config = 13; ElasticsearchConfig elasticsearch_config = 14; + MySqlConfig mysql_config = 15; } } diff --git a/ui/app/api/peers/getTruePeer.ts b/ui/app/api/peers/getTruePeer.ts index e21323a07b..c8b5c55a37 100644 --- a/ui/app/api/peers/getTruePeer.ts +++ b/ui/app/api/peers/getTruePeer.ts @@ -6,6 +6,7 @@ import { EventHubConfig, EventHubGroupConfig, KafkaConfig, + MySqlConfig, Peer, PostgresConfig, PubSubConfig, @@ -26,6 +27,7 @@ export const getTruePeer = (peer: CatalogPeer) => { | EventHubConfig | EventHubGroupConfig | KafkaConfig + | MySqlConfig | PostgresConfig | PubSubConfig | S3Config @@ -45,10 +47,6 @@ export const getTruePeer = (peer: CatalogPeer) => { config = PostgresConfig.decode(options); newPeer.postgresConfig = config; break; - case 4: - config = EventHubConfig.decode(options); - newPeer.eventhubConfig = config; - break; case 5: config = S3Config.decode(options); newPeer.s3Config = config; @@ -58,8 +56,8 @@ export const getTruePeer = (peer: CatalogPeer) => { newPeer.sqlserverConfig = config; break; case 7: - config = EventHubGroupConfig.decode(options); - newPeer.eventhubGroupConfig = config; + config = MySqlConfig.decode(options); + newPeer.mysqlConfig = config; break; case 8: config = ClickhouseConfig.decode(options);