diff --git a/Cargo.lock b/Cargo.lock index 0ad992b47..f14b138a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,6 +23,17 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" +[[package]] +name = "aes" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" +dependencies = [ + "cfg-if", + "cipher", + "cpufeatures", +] + [[package]] name = "ahash" version = "0.8.7" @@ -67,6 +78,54 @@ version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" +[[package]] +name = "amq-protocol" +version = "7.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3a41c091e49edfcc098b4f90d4d7706a8cf9158034e84ebfee7ff346092f67c" +dependencies = [ + "amq-protocol-tcp", + "amq-protocol-types", + "amq-protocol-uri", + "cookie-factory", + "nom", + "serde", +] + +[[package]] +name = "amq-protocol-tcp" +version = "7.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ed7a4a662472f88823ed2fc81babb0b00562f2c54284e3e7bffc02b6df649bf" +dependencies = [ + "amq-protocol-uri", + "tcp-stream", + "tracing", +] + +[[package]] +name = "amq-protocol-types" +version = "7.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd6484fdc918c1b6e2ae8eda2914d19a5873e1975f93ad8d33d6a24d1d98df05" +dependencies = [ + "cookie-factory", + "nom", + "serde", + "serde_json", +] + +[[package]] +name = "amq-protocol-uri" +version = "7.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f7f2da69e0e1182765bf33407cd8a843f20791b5af2b57a2645818c4776c56c" +dependencies = [ + "amq-protocol-types", + "percent-encoding", + "url", +] + [[package]] name = "android-tzdata" version = "0.1.1" @@ -547,6 +606,7 @@ dependencies = [ "glob", "governor", "itertools 0.14.0", + "lapin", "object_store", "parquet", "prost 0.13.4", @@ -970,7 +1030,7 @@ dependencies = [ "arroyo-udf-common", "proc-macro2", "quote", - "rand 0.9.0-beta.3", + "rand 0.9.0", "syn 2.0.96", ] @@ -1040,6 +1100,45 @@ dependencies = [ "tracing", ] +[[package]] +name = "asn1-rs" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "607495ec7113b178fbba7a6166a27f99e774359ef4823adbefd756b5b81d7970" +dependencies = [ + "asn1-rs-derive", + "asn1-rs-impl", + "displaydoc", + "nom", + "num-traits", + "rusticata-macros", + "thiserror 2.0.11", + "time", +] + +[[package]] +name = "asn1-rs-derive" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3109e49b1e4909e9db6515a30c633684d68cdeaa252f215214cb4fa1a5bfee2c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.96", + "synstructure", +] + +[[package]] +name = "asn1-rs-impl" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b18050c2cd6fe86c3a76584ef5e0baf286d038cda203eb6223df2cc413565f7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.96", +] + [[package]] name = "async-broadcast" version = "0.7.2" @@ -1101,8 +1200,8 @@ checksum = "30ca9a001c1e8ba5149f91a74362376cc6bc5b919d92d988668657bd570bdcec" dependencies = [ "async-task", "concurrent-queue", - "fastrand", - "futures-lite", + "fastrand 2.3.0", + "futures-lite 2.6.0", "slab", ] @@ -1134,32 +1233,72 @@ checksum = "05b1b633a2115cd122d73b955eadd9916c18c8f510ec9cd1686404c60ad1c29c" dependencies = [ "async-channel 2.3.1", "async-executor", - "async-io", - "async-lock", + "async-io 2.4.0", + "async-lock 3.4.0", "blocking", - "futures-lite", + "futures-lite 2.6.0", "once_cell", ] +[[package]] +name = "async-global-executor-trait" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80f19936c1a84fb48ceb8899b642d2a72572587d1021cc561bfb24de9f33ee89" +dependencies = [ + "async-global-executor", + "async-trait", + "executor-trait", +] + +[[package]] +name = "async-io" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" +dependencies = [ + "async-lock 2.8.0", + "autocfg", + "cfg-if", + "concurrent-queue", + "futures-lite 1.13.0", + "log", + "parking", + "polling 2.8.0", + "rustix 0.37.28", + "slab", + "socket2 0.4.10", + "waker-fn", +] + [[package]] name = "async-io" version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43a2b323ccce0a1d90b449fd71f2a06ca7faa7c54c2751f06c9bd851fc061059" dependencies = [ - "async-lock", + "async-lock 3.4.0", "cfg-if", "concurrent-queue", "futures-io", - "futures-lite", + "futures-lite 2.6.0", "parking", - "polling", - "rustix", + "polling 3.7.4", + "rustix 0.38.43", "slab", "tracing", "windows-sys 0.59.0", ] +[[package]] +name = "async-lock" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" +dependencies = [ + "event-listener 2.5.3", +] + [[package]] name = "async-lock" version = "3.4.0" @@ -1213,9 +1352,9 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b948000fad4873c1c9339d60f2623323a0cfd3816e5181033c6a5cb68b2accf7" dependencies = [ - "async-io", + "async-io 2.4.0", "blocking", - "futures-lite", + "futures-lite 2.6.0", ] [[package]] @@ -1225,31 +1364,43 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63255f1dc2381611000436537bbedfe83183faa303a5a0edaf191edef06526bb" dependencies = [ "async-channel 2.3.1", - "async-io", - "async-lock", + "async-io 2.4.0", + "async-lock 3.4.0", "async-signal", "async-task", "blocking", "cfg-if", "event-listener 5.4.0", - "futures-lite", - "rustix", + "futures-lite 2.6.0", + "rustix 0.38.43", "tracing", ] +[[package]] +name = "async-reactor-trait" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a6012d170ad00de56c9ee354aef2e358359deb1ec504254e0e5a3774771de0e" +dependencies = [ + "async-io 1.13.0", + "async-trait", + "futures-core", + "reactor-trait", +] + [[package]] name = "async-signal" version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "637e00349800c0bdf8bfc21ebbc0b6524abea702b0da4168ac00d070d0c0b9f3" dependencies = [ - "async-io", - "async-lock", + "async-io 2.4.0", + "async-lock 3.4.0", "atomic-waker", "cfg-if", "futures-core", "futures-io", - "rustix", + "rustix 0.38.43", "signal-hook-registry", "slab", "windows-sys 0.59.0", @@ -1263,14 +1414,14 @@ checksum = "c634475f29802fde2b8f0b505b1bd00dfe4df7d4a000f0b36f7671197d5c3615" dependencies = [ "async-channel 1.9.0", "async-global-executor", - "async-io", - "async-lock", + "async-io 2.4.0", + "async-lock 3.4.0", "async-process", "crossbeam-utils", "futures-channel", "futures-core", "futures-io", - "futures-lite", + "futures-lite 2.6.0", "gloo-timers", "kv-log-macro", "log", @@ -1381,7 +1532,7 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", - "fastrand", + "fastrand 2.3.0", "hex", "http 0.2.12", "ring", @@ -1437,7 +1588,7 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", - "fastrand", + "fastrand 2.3.0", "http 0.2.12", "http-body 0.4.6", "once_cell", @@ -1463,7 +1614,7 @@ dependencies = [ "aws-smithy-types", "aws-types", "bytes", - "fastrand", + "fastrand 2.3.0", "http 0.2.12", "once_cell", "regex-lite", @@ -1643,7 +1794,7 @@ dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", "bytes", - "fastrand", + "fastrand 2.3.0", "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", @@ -1790,7 +1941,7 @@ dependencies = [ "axum", "axum-core", "bytes", - "fastrand", + "fastrand 2.3.0", "futures-util", "headers", "http 1.2.0", @@ -1833,7 +1984,7 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba5289ec98f68f28dd809fd601059e6aa908bb8f6108620930828283d4ee23d7" dependencies = [ - "fastrand", + "fastrand 2.3.0", "tokio", ] @@ -1977,6 +2128,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block-padding" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8894febbff9f758034a5b8e12d87918f56dfc64a8e1fe757d65e29041538d93" +dependencies = [ + "generic-array", +] + [[package]] name = "blocking" version = "1.6.1" @@ -1986,7 +2146,7 @@ dependencies = [ "async-channel 2.3.1", "async-task", "futures-io", - "futures-lite", + "futures-lite 2.6.0", "piper", ] @@ -2135,6 +2295,15 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "cbc" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b52a9543ae338f279b96b0b9fed9c8093744685043739079ce85cd58f289a6" +dependencies = [ + "cipher", +] + [[package]] name = "cc" version = "1.2.9" @@ -2204,6 +2373,16 @@ dependencies = [ "stacker", ] +[[package]] +name = "cipher" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" +dependencies = [ + "crypto-common", + "inout", +] + [[package]] name = "clap" version = "4.5.26" @@ -2268,6 +2447,18 @@ dependencies = [ "cc", ] +[[package]] +name = "cms" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b77c319abfd5219629c45c34c89ba945ed3c5e49fcde9d16b6c3885f118a730" +dependencies = [ + "const-oid", + "der", + "spki", + "x509-cert", +] + [[package]] name = "codegen_template" version = "0.1.0" @@ -2364,6 +2555,12 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" +[[package]] +name = "cookie-factory" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9885fa71e26b8ab7855e2ec7cae6e9b380edff76cd052e07c683a0319d51b3a2" + [[package]] name = "copy-artifacts" version = "0.14.0-dev" @@ -3277,10 +3474,37 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f55bf8e7b65898637379c1b74eb1551107c8294ed26d855ceb9fd1a09cfc9bc0" dependencies = [ "const-oid", + "der_derive", + "flagset", "pem-rfc7468", "zeroize", ] +[[package]] +name = "der-parser" +version = "10.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07da5016415d5a3c4dd39b11ed26f915f52fc4e0dc197d87908bc916e51bc1a6" +dependencies = [ + "asn1-rs", + "displaydoc", + "nom", + "num-bigint", + "num-traits", + "rusticata-macros", +] + +[[package]] +name = "der_derive" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8034092389675178f570469e6c3b0465d3d30b4505c294a6550db47f3c17ad18" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.96", +] + [[package]] name = "deranged" version = "0.3.11" @@ -3346,6 +3570,15 @@ dependencies = [ "syn 2.0.96", ] +[[package]] +name = "des" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffdd80ce8ce993de27e9f063a444a4d53ce8e8db4c1f00cc03af5ad5a9867a1e" +dependencies = [ + "cipher", +] + [[package]] name = "digest" version = "0.10.7" @@ -3412,6 +3645,12 @@ dependencies = [ "syn 2.0.96", ] +[[package]] +name = "doc-comment" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" + [[package]] name = "duct" version = "0.13.7" @@ -3567,6 +3806,15 @@ dependencies = [ "tokio", ] +[[package]] +name = "executor-trait" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13c39dff9342e4e0e16ce96be751eb21a94e94a87bb2f6e63ad1961c2ce109bf" +dependencies = [ + "async-trait", +] + [[package]] name = "eyre" version = "0.6.12" @@ -3595,6 +3843,15 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" +[[package]] +name = "fastrand" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" +dependencies = [ + "instant", +] + [[package]] name = "fastrand" version = "2.3.0" @@ -3669,6 +3926,12 @@ version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" +[[package]] +name = "flagset" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3ea1ec5f8307826a5b71094dd91fc04d4ae75d5709b20ad351c7fb4815c86ec" + [[package]] name = "flatbuffers" version = "24.12.23" @@ -3708,7 +3971,7 @@ checksum = "d325369b0fb41d003b70a329cffcff188fef38f613a210ff0cb7949bea97aebc" dependencies = [ "anyhow", "async-channel 1.9.0", - "async-lock", + "async-lock 3.4.0", "async-trait", "cfg-if", "chrono", @@ -3786,18 +4049,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0a28090046453db33a8bace0e1f71350b9878cd7fb576e48592ae8284bc83c7e" dependencies = [ "anyhow", - "async-io", + "async-io 2.4.0", "async-net", "async-std", "async-trait", "cfg-if", "fluvio-wasm-timer", - "futures-lite", + "futures-lite 2.6.0", "futures-util", "openssl", "openssl-sys", "pin-project", - "socket2", + "socket2 0.5.8", "thiserror 1.0.69", "tracing", "ws_stream_wasm", @@ -3886,7 +4149,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28fc32dca29a840c2a6cc648c38f1c39f41ae0b353b3309cca799f4c8b4d5829" dependencies = [ "async-channel 1.9.0", - "async-lock", + "async-lock 3.4.0", "async-trait", "built", "bytes", @@ -3932,7 +4195,7 @@ checksum = "f662e68aae9386cca21735f157d84a74ef5dcde18fdc643d2531a86e00cd9262" dependencies = [ "anyhow", "async-channel 1.9.0", - "async-lock", + "async-lock 3.4.0", "async-trait", "cfg-if", "fluvio-future", @@ -3954,7 +4217,7 @@ version = "0.11.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd883354fee545863d0983ef7bd74ad6b2d6aa58e74af3eafc6490939b8a16aa" dependencies = [ - "async-lock", + "async-lock 3.4.0", "event-listener 5.4.0", "k8-types", "once_cell", @@ -4090,13 +4353,28 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" +[[package]] +name = "futures-lite" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +dependencies = [ + "fastrand 1.9.0", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + [[package]] name = "futures-lite" version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f5edaec856126859abb19ed65f39e90fea3a9574b9707f13539acf4abf7eb532" dependencies = [ - "fastrand", + "fastrand 2.3.0", "futures-core", "futures-io", "parking", @@ -4175,16 +4453,13 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.3.0-rc.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a78f88e84d239c7f2619ae8b091603c26208e1cb322571f5a29d6806f56ee5e" +checksum = "43a49c392881ce6d5c3b8cb70f98717b7c07aabbdff06687b9030dbfbe2725f8" dependencies = [ "cfg-if", - "js-sys", "libc", - "rustix", "wasi 0.13.3+wasi-0.2.2", - "wasm-bindgen", "windows-targets 0.52.6", ] @@ -4511,7 +4786,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2", + "socket2 0.5.8", "tokio", "tower-service", "tracing", @@ -4649,7 +4924,7 @@ dependencies = [ "http-body 1.0.1", "hyper 1.5.2", "pin-project-lite", - "socket2", + "socket2 0.5.8", "tokio", "tower-service", "tracing", @@ -4870,6 +5145,16 @@ version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8fae54786f62fb2918dcfae3d568594e50eb9b5c25bf04371af6fe7516452fb" +[[package]] +name = "inout" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5" +dependencies = [ + "block-padding", + "generic-array", +] + [[package]] name = "instant" version = "0.1.13" @@ -4918,6 +5203,17 @@ dependencies = [ "rustversion", ] +[[package]] +name = "io-lifetimes" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" +dependencies = [ + "hermit-abi 0.3.9", + "libc", + "windows-sys 0.48.0", +] + [[package]] name = "ipnet" version = "2.10.1" @@ -5236,6 +5532,28 @@ dependencies = [ "log", ] +[[package]] +name = "lapin" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "209b09a06f4bd4952a0fd0594f90d53cf4496b062f59acc838a2823e1bb7d95c" +dependencies = [ + "amq-protocol", + "async-global-executor-trait", + "async-reactor-trait", + "async-trait", + "executor-trait", + "flume", + "futures-core", + "futures-io", + "parking_lot 0.12.3", + "pinky-swear", + "reactor-trait", + "serde", + "tracing", + "waker-fn", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -5434,6 +5752,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "linux-raw-sys" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" + [[package]] name = "linux-raw-sys" version = "0.4.15" @@ -5983,6 +6307,15 @@ dependencies = [ "walkdir", ] +[[package]] +name = "oid-registry" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "264c56d1492c13e769662197fb6b94e0a52abe52d27efac374615799a4bf453d" +dependencies = [ + "asn1-rs", +] + [[package]] name = "once_cell" version = "1.20.2" @@ -6126,6 +6459,28 @@ version = "3.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f" +[[package]] +name = "p12-keystore" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a09eaa3a6d8884c204c2ab17e313f563b524362e62567f09ba27857a6e31257f" +dependencies = [ + "cbc", + "cms", + "der", + "des", + "hex", + "hmac", + "pkcs12", + "pkcs5", + "rand 0.9.0", + "rc2", + "sha1", + "sha2", + "thiserror 2.0.11", + "x509-parser", +] + [[package]] name = "parking" version = "2.2.1" @@ -6236,6 +6591,16 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df94ce210e5bc13cb6651479fa48d14f601d9858cfe0467f43ae157023b938d3" +[[package]] +name = "pbkdf2" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8ed6a7761f76e3b9f92dfb0a60a6a6477c61024b775147ff0973a02653abaf2" +dependencies = [ + "digest", + "hmac", +] + [[package]] name = "pear" version = "0.2.9" @@ -6431,6 +6796,18 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pinky-swear" +version = "6.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6cfae3ead413ca051a681152bd266438d3bfa301c9bdf836939a14c721bb2a21" +dependencies = [ + "doc-comment", + "flume", + "parking_lot 0.12.3", + "tracing", +] + [[package]] name = "piper" version = "0.2.4" @@ -6438,10 +6815,40 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96c8c490f422ef9a4efd2cb5b42b76c8613d7e7dfc1caf667b8a3350a5acc066" dependencies = [ "atomic-waker", - "fastrand", + "fastrand 2.3.0", "futures-io", ] +[[package]] +name = "pkcs12" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "695b3df3d3cc1015f12d70235e35b6b79befc5fa7a9b95b951eab1dd07c9efc2" +dependencies = [ + "cms", + "const-oid", + "der", + "digest", + "spki", + "x509-cert", + "zeroize", +] + +[[package]] +name = "pkcs5" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e847e2c91a18bfa887dd028ec33f2fe6f25db77db3619024764914affe8b69a6" +dependencies = [ + "aes", + "cbc", + "der", + "pbkdf2", + "scrypt", + "sha2", + "spki", +] + [[package]] name = "pkcs8" version = "0.10.2" @@ -6458,6 +6865,22 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2" +[[package]] +name = "polling" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" +dependencies = [ + "autocfg", + "bitflags 1.3.2", + "cfg-if", + "concurrent-queue", + "libc", + "log", + "pin-project-lite", + "windows-sys 0.48.0", +] + [[package]] name = "polling" version = "3.7.4" @@ -6468,7 +6891,7 @@ dependencies = [ "concurrent-queue", "hermit-abi 0.4.0", "pin-project-lite", - "rustix", + "rustix 0.38.43", "tracing", "windows-sys 0.59.0", ] @@ -6675,7 +7098,7 @@ dependencies = [ "hex", "lazy_static", "procfs-core", - "rustix", + "rustix 0.38.43", ] [[package]] @@ -7031,7 +7454,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls 0.23.21", - "socket2", + "socket2 0.5.8", "thiserror 2.0.11", "tokio", "tracing", @@ -7066,7 +7489,7 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2", + "socket2 0.5.8", "tracing", "windows-sys 0.59.0", ] @@ -7137,12 +7560,12 @@ dependencies = [ [[package]] name = "rand" -version = "0.9.0-beta.3" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6fccbfebb3972a41a31c605a59207d9fba5489b9a87d9d87024cb6df73a32ec7" +checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94" dependencies = [ - "rand_chacha 0.9.0-beta.1", - "rand_core 0.9.0-beta.1", + "rand_chacha 0.9.0", + "rand_core 0.9.0", "zerocopy 0.8.14", ] @@ -7158,12 +7581,12 @@ dependencies = [ [[package]] name = "rand_chacha" -version = "0.9.0-beta.1" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f16da77124f4ee9fabd55ce6540866e9101431863b4876de58b68797f331adf2" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" dependencies = [ "ppv-lite86", - "rand_core 0.9.0-beta.1", + "rand_core 0.9.0", ] [[package]] @@ -7177,11 +7600,11 @@ dependencies = [ [[package]] name = "rand_core" -version = "0.9.0-beta.1" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a98fa0b8309344136abe6244130311e76997e546f76fae8054422a7539b43df7" +checksum = "b08f3c9802962f7e1b25113931d94f43ed9725bebc59db9d0c3e9a23b67e15ff" dependencies = [ - "getrandom 0.3.0-rc.0", + "getrandom 0.3.1", "zerocopy 0.8.14", ] @@ -7194,6 +7617,15 @@ dependencies = [ "bitflags 2.7.0", ] +[[package]] +name = "rc2" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62c64daa8e9438b84aaae55010a93f396f8e60e3911590fcba770d04643fc1dd" +dependencies = [ + "cipher", +] + [[package]] name = "rdkafka" version = "0.37.0" @@ -7229,6 +7661,17 @@ dependencies = [ "zstd-sys", ] +[[package]] +name = "reactor-trait" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "438a4293e4d097556730f4711998189416232f009c137389e0f961d2bc0ddc58" +dependencies = [ + "async-trait", + "futures-core", + "futures-io", +] + [[package]] name = "redis" version = "0.28.1" @@ -7255,7 +7698,7 @@ dependencies = [ "rustls-pki-types", "ryu", "sha1_smol", - "socket2", + "socket2 0.5.8", "tokio", "tokio-rustls 0.26.1", "tokio-util", @@ -7656,6 +8099,29 @@ dependencies = [ "semver", ] +[[package]] +name = "rusticata-macros" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "faf0c4a6ece9950b9abdb62b1cfcf2a68b3b67a10ba445b3bb85be2a293d0632" +dependencies = [ + "nom", +] + +[[package]] +name = "rustix" +version = "0.37.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "519165d378b97752ca44bbe15047d5d3409e875f39327546b42ac81d7e18c1b6" +dependencies = [ + "bitflags 1.3.2", + "errno", + "io-lifetimes", + "libc", + "linux-raw-sys 0.3.8", + "windows-sys 0.48.0", +] + [[package]] name = "rustix" version = "0.38.43" @@ -7665,7 +8131,7 @@ dependencies = [ "bitflags 2.7.0", "errno", "libc", - "linux-raw-sys", + "linux-raw-sys 0.4.15", "windows-sys 0.59.0", ] @@ -7710,6 +8176,19 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-connector" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a980454b497c439c274f2feae2523ed8138bbd3d323684e1435fec62f800481" +dependencies = [ + "log", + "rustls 0.23.21", + "rustls-native-certs 0.7.3", + "rustls-pki-types", + "rustls-webpki 0.102.8", +] + [[package]] name = "rustls-native-certs" version = "0.6.3" @@ -7807,6 +8286,15 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" +[[package]] +name = "salsa20" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97a22f5af31f73a954c10289c93e8a50cc23d971e80ee446f1f6f7137a088213" +dependencies = [ + "cipher", +] + [[package]] name = "same-file" version = "1.0.6" @@ -7869,6 +8357,17 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "scrypt" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0516a385866c09368f0b5bcd1caff3366aace790fcd46e2bb032697bb172fd1f" +dependencies = [ + "pbkdf2", + "salsa20", + "sha2", +] + [[package]] name = "sct" version = "0.7.1" @@ -8287,6 +8786,16 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" +[[package]] +name = "socket2" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "socket2" version = "0.5.8" @@ -8554,6 +9063,18 @@ version = "0.12.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61c41af27dd6d1e27b1b16b489db798443478cef1f06a660c96db617ba5de3b1" +[[package]] +name = "tcp-stream" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "495b0abdce3dc1f8fd27240651c9e68890c14e9d9c61527b1ce44d8a5a7bd3d5" +dependencies = [ + "cfg-if", + "p12-keystore", + "rustls-connector", + "rustls-pemfile 2.2.0", +] + [[package]] name = "tempfile" version = "3.15.0" @@ -8561,10 +9082,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a8a559c81686f576e8cd0290cd2a24a2a9ad80c98b3478856500fcbd7acd704" dependencies = [ "cfg-if", - "fastrand", + "fastrand 2.3.0", "getrandom 0.2.15", "once_cell", - "rustix", + "rustix 0.38.43", "windows-sys 0.59.0", ] @@ -8815,7 +9336,7 @@ dependencies = [ "parking_lot 0.12.3", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.5.8", "tokio-macros", "tracing", "windows-sys 0.52.0", @@ -8872,7 +9393,7 @@ dependencies = [ "postgres-protocol", "postgres-types", "rand 0.8.5", - "socket2", + "socket2 0.5.8", "tokio", "tokio-util", "whoami", @@ -9026,7 +9547,7 @@ dependencies = [ "percent-encoding", "pin-project", "prost 0.13.4", - "socket2", + "socket2 0.5.8", "tokio", "tokio-stream", "tower", @@ -9631,6 +10152,12 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" +[[package]] +name = "waker-fn" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "317211a0dc0ceedd78fb2ca9a44aed3d7b9b26f81870d485c07122b4350673b7" + [[package]] name = "walkdir" version = "2.5.0" @@ -10148,6 +10675,34 @@ dependencies = [ "tap", ] +[[package]] +name = "x509-cert" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1301e935010a701ae5f8655edc0ad17c44bad3ac5ce8c39185f75453b720ae94" +dependencies = [ + "const-oid", + "der", + "spki", +] + +[[package]] +name = "x509-parser" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4569f339c0c402346d4a75a9e39cf8dad310e287eef1ff56d4c68e5067f53460" +dependencies = [ + "asn1-rs", + "data-encoding", + "der-parser", + "lazy_static", + "nom", + "oid-registry", + "rusticata-macros", + "thiserror 2.0.11", + "time", +] + [[package]] name = "xmlparser" version = "0.13.6" diff --git a/crates/arroyo-connectors/Cargo.toml b/crates/arroyo-connectors/Cargo.toml index 0c1f2a521..0307e1542 100644 --- a/crates/arroyo-connectors/Cargo.toml +++ b/crates/arroyo-connectors/Cargo.toml @@ -89,5 +89,8 @@ rustls = "0.22" # NATS async-nats = "0.38.0" +# AMQP +lapin = "2.5.0" + [build-dependencies] glob = "0.3" diff --git a/crates/arroyo-connectors/src/amqp/mod.rs b/crates/arroyo-connectors/src/amqp/mod.rs new file mode 100644 index 000000000..e880b8477 --- /dev/null +++ b/crates/arroyo-connectors/src/amqp/mod.rs @@ -0,0 +1,125 @@ +use anyhow::anyhow; +use arroyo_operator::connector::{Connection, Connector}; +use arroyo_operator::operator::ConstructedOperator; +use arroyo_rpc::api_types::connections::{ + ConnectionProfile, ConnectionSchema, ConnectionType, TestSourceMessage, +}; +use arroyo_rpc::{ConnectorOptions, OperatorConfig}; + +use crate::EmptyConfig; +mod operator; + +use crate::amqp::operator::AmqpSourceFunc; +pub struct AmqpConnector {} + +static ICON: &str = &"placeholdersvg"; + +impl Connector for AmqpConnector { + type ProfileT = EmptyConfig; + type TableT = EmptyConfig; + + fn name(&self) -> &'static str { + "amqp" + } + + fn metadata(&self) -> arroyo_rpc::api_types::connections::Connector { + arroyo_rpc::api_types::connections::Connector { + id: self.name().to_string(), + name: "AMQP".to_string(), + icon: ICON.to_string(), + description: "Source of AMQP messages".to_string(), + enabled: true, + source: true, + sink: false, + testing: false, + hidden: false, + custom_schemas: true, + connection_config: None, + table_config: "{\"type\": \"object\", \"title\": \"Amqp\"}".to_string(), + } + } + + fn table_type(&self, _: Self::ProfileT, _: Self::TableT) -> ConnectionType { + ConnectionType::Source + } + + fn get_schema( + &self, + _: Self::ProfileT, + _: Self::TableT, + s: Option<&ConnectionSchema>, + ) -> Option { + s.cloned() + } + + fn test( + &self, + _: &str, + _: Self::ProfileT, + _: Self::TableT, + _: Option<&ConnectionSchema>, + tx: tokio::sync::mpsc::Sender, + ) { + tokio::task::spawn(async move { + let message = TestSourceMessage { + error: false, + done: true, + message: "Successfully validated connection".to_string(), + }; + tx.send(message).await.unwrap(); + }); + } + + fn from_options( + &self, + name: &str, + _options: &mut ConnectorOptions, + schema: Option<&ConnectionSchema>, + _profile: Option<&ConnectionProfile>, + ) -> anyhow::Result { + self.from_config(None, name, EmptyConfig {}, EmptyConfig {}, schema) + } + + fn from_config( + &self, + id: Option, + name: &str, + config: Self::ProfileT, + table: Self::TableT, + s: Option<&ConnectionSchema>, + ) -> anyhow::Result { + let description = "Amqp".to_string(); + + let config = OperatorConfig { + connection: serde_json::to_value(config).unwrap(), + table: serde_json::to_value(table).unwrap(), + rate_limit: None, + format: None, + bad_data: None, + framing: None, + metadata_fields: vec![], + }; + + Ok(Connection::new( + id, + self.name(), + name.to_string(), + ConnectionType::Source, + s.cloned() + .ok_or_else(|| anyhow!("no schema for amqp source"))?, + &config, + description, + )) + } + + fn make_operator( + &self, + _: Self::ProfileT, + _: Self::TableT, + _: OperatorConfig, + ) -> anyhow::Result { + Ok(ConstructedOperator::from_operator(Box::new( + AmqpSourceFunc::new(), + ))) + } +} diff --git a/crates/arroyo-connectors/src/amqp/operator.rs b/crates/arroyo-connectors/src/amqp/operator.rs new file mode 100644 index 000000000..6d92dca06 --- /dev/null +++ b/crates/arroyo-connectors/src/amqp/operator.rs @@ -0,0 +1,262 @@ +use arrow::array::RecordBatch; +use arroyo_formats::de::FieldValueType; +use arroyo_operator::operator::ArrowOperator; +use arroyo_operator::{ + context::{Collector, OperatorContext, SourceCollector, SourceContext}, + operator::SourceOperator, + SourceFinishType, +}; +use arroyo_rpc::formats::{BadData, Format, Framing}; +use arroyo_rpc::grpc::rpc::TableConfig; +use arroyo_rpc::schema_resolver::SchemaResolver; +use arroyo_rpc::{grpc::rpc::StopMode, ControlMessage, MetadataField}; +use arroyo_state::global_table_config; +use arroyo_state::tables::global_keyed_map::GlobalKeyedView; +use arroyo_types::*; +use async_trait::async_trait; +use futures::StreamExt; +use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties, Result}; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; +use tokio::select; +use tokio::time::MissedTickBehavior; +use tracing::{debug, error, info}; + +#[derive(Debug)] +pub struct AmqpSourceFunc { + pub address: String, + pub topic: String, + pub format: Format, + pub framing: Option, + pub bad_data: Option, + pub metadata_fields: Vec, + pub schema_resolver: Option>, +} + +#[derive(Clone, Debug)] +pub struct AmqpState { + pub delivery_tag: u64, + pub offset: u64, +} + +impl AmqpSourceFunc { + pub fn new() -> Self { + Self { + address: todo!(), + topic: todo!(), + format: todo!(), + framing: todo!(), + bad_data: todo!(), + metadata_fields: todo!(), + schema_resolver: todo!(), + } + } + /// Manages the main loop for consuming messages from the AMQP stream + /// It first creates a connection and handles any errors that occur during this process. + /// It sets up a ticker to periodically flush the message collector's buffer. + /// Inside the loop, it uses the select! macro to handle different asynchronous events: receiving a message from the consumer, ticking the flush ticker, or receiving control messages from the context. + /// Depending on the event, it processes the message, flushes the buffer if needed, handles errors, or processes control messages such as checkpoints, stopping, committing, or loading compacted data. + /// The function ensures that the stream is read and processed continuously until a stop condition is met. + async fn run_int( + &mut self, + ctx: &mut SourceContext, + collector: &mut SourceCollector, + ) -> Result { + let consumer = self.get_consumer(ctx).await?; + // .map_err(|e| UserError::new("Failed to consume from queue", e.to_string()))?; + + // todo might add governor if rate limiting bevcomes necessary https://crates.io/crates/governor + let mut flush_ticker = tokio::time::interval(Duration::from_millis(50)); + flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); + + if let Some(schema_resolver) = &self.schema_resolver { + collector.initialize_deserializer_with_resolver( + self.format.clone(), + self.framing.clone(), + self.bad_data.clone(), + &self.metadata_fields, + schema_resolver.clone(), + ); + } else { + collector.initialize_deserializer( + self.format.clone(), + self.framing.clone(), + self.bad_data.clone(), + &self.metadata_fields, + ); + } + + let state: &mut GlobalKeyedView = ctx + .table_manager + .get_global_keyed_state("q") + .await + .expect("should have table"); + + loop { + select! { + delivery_result = consumer.next() => { + match delivery_result { + Some(Ok(delivery)) => { + // Extract message payload + let data: Vec = delivery.data.clone(); + + // Extract timestamp (if exists) + let timestamp: u64 = delivery.properties.timestamp() + .unwrap_or_else(|| chrono::Utc::now().timestamp_millis() as u64); // Default to current time + + // Extract metadata fields (equivalent to Kafka's metadata fields) + let mut connector_metadata = HashMap::new(); + connector_metadata.insert("exchange", FieldValueType::String(delivery.exchange.as_str().into())); + connector_metadata.insert("routing_key", FieldValueType::String(delivery.routing_key.as_str().into())); + connector_metadata.insert("timestamp", FieldValueType::Int64(Some(timestamp.try_into().unwrap()))); + + // Deserialize and process the message + collector.deserialize_slice(&data, from_millis(timestamp), Some(&connector_metadata)).await; + + if collector.should_flush() { + collector.flush_buffer().await; + } + + // Store last processed offset (RabbitMQ uses delivery_tag instead of offset) + offsets.insert(delivery.delivery_tag, delivery.delivery_tag); + + // Acknowledge the message + delivery.ack(BasicAckOptions::default()).await?; + }, + Some(Err(err)) => { + error!("Encountered AMQP error: {:?}", err); + }, + None => { + tokio::time::sleep(Duration::from_millis(500)).await; + } + } + } + _ = flush_ticker.tick() => { + if collector.should_flush() { + collector.flush_buffer().await; + } + } + control_message = ctx.control_rx.recv() => { + match control_message { + Some(ControlMessage::Checkpoint(c)) => { + debug!("Starting checkpoint {}", ctx.task_info.task_index); + + let s = ctx.table_manager.get_global_keyed_state("q").await + .map_err(|err| UserError::new("Failed to get global key value", err.to_string()))?; + + // todo need to fix this with offsets + for (&delivery_tag, &offset) in &offsets { + s.insert(delivery_tag, AmqpState{ + delivery_tag, + offset: offset + 1, // Simulating Kafka's offset increment + }).await; + } + + if self.start_checkpoint(c, ctx, collector).await { + return Ok(SourceFinishType::Immediate); + } + }, + + Some(ControlMessage::Stop { mode }) => { + info!("Stopping RabbitMQ source: {:?}", mode); + return Ok(match mode { + StopMode::Graceful => SourceFinishType::Graceful, + StopMode::Immediate => SourceFinishType::Immediate, + }); + } + + Some(ControlMessage::LoadCompacted { compacted }) => { + ctx.load_compacted(compacted).await; + } + + Some(ControlMessage::Commit { .. }) => { + unreachable!("sources shouldn't receive commit messages"); + }, + + Some(ControlMessage::NoOp) | None => {} + } + } + } + } + } + + async fn get_consumer(&mut self, ctx: &mut SourceContext) -> Result { + let conn = Connection::connect(&self.address, ConnectionProperties::default()).await?; + let channel = conn.create_channel().await.expect("create_channel"); + let queue_name = format!( + "amqp-arroyo-{}-{}", + ctx.task_info.job_id, ctx.task_info.operator_id + ); + let consumer_name = format!( + "arroyo-{}-{}-consumer", + ctx.task_info.job_id, ctx.task_info.operator_id + ); + let queue = channel + .queue_declare( + &queue_name, + QueueDeclareOptions::default(), + FieldTable::default(), + ) + .await + .expect("queue_declare"); + let consumer = channel + .basic_consume( + &queue_name, + &consumer_name, + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await?; + + Ok(consumer) + } +} + +#[async_trait] +impl ArrowOperator for AmqpSourceFunc { + fn name(&self) -> String { + "AmqpSource".to_string() + } + + async fn process_batch( + &mut self, + _: RecordBatch, + _: &mut OperatorContext, + _: &mut dyn Collector, + ) { + // no-op + } +} + +#[async_trait] +impl SourceOperator for AmqpSourceFunc { + fn name(&self) -> String { + format!("amqp-lapin-{}", self.topic) + } + + fn tables(&self) -> HashMap { + global_table_config("q", "AMQP table") + } + + async fn run( + &mut self, + ctx: &mut arroyo_operator::context::SourceContext, + collector: &mut SourceCollector, + ) -> SourceFinishType { + collector.initialize_deserializer( + self.format.clone(), + self.framing.clone(), + self.bad_data.clone(), + &[], + ); + + match self.run_int(ctx, collector).await { + Ok(r) => r, + Err(e) => { + ctx.report_error(e, "failed to configure the AMQP source") + .await + } + } + } +} diff --git a/crates/arroyo-connectors/src/kafka/source/mod.rs b/crates/arroyo-connectors/src/kafka/source/mod.rs index 083b68b39..d880a6dbb 100644 --- a/crates/arroyo-connectors/src/kafka/source/mod.rs +++ b/crates/arroyo-connectors/src/kafka/source/mod.rs @@ -220,7 +220,7 @@ impl KafkaSourceFunc { collector.deserialize_slice(v, from_millis(timestamp.max(0) as u64), connector_metadata.as_ref()).await?; if collector.should_flush() { - collector.flush_buffer().await?; + collector.flush_buffer().await; } offsets.insert(msg.partition(), msg.offset()); diff --git a/crates/arroyo-connectors/src/lib.rs b/crates/arroyo-connectors/src/lib.rs index 5f53fdf7a..cb5cf349b 100644 --- a/crates/arroyo-connectors/src/lib.rs +++ b/crates/arroyo-connectors/src/lib.rs @@ -14,6 +14,7 @@ use std::time::Duration; use tokio::sync::mpsc::Sender; use tracing::warn; +pub mod amqp; pub mod blackhole; pub mod confluent; pub mod filesystem; @@ -36,6 +37,7 @@ pub mod websocket; pub fn connectors() -> HashMap<&'static str, Box> { let connectors: Vec> = vec![ + Box::new(amqp::AmqpConnector {}), Box::new(blackhole::BlackholeConnector {}), Box::new(confluent::ConfluentConnector {}), Box::new(filesystem::delta::DeltaLakeConnector {}),