diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0c20bcc498..872c859304 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -4,7 +4,9 @@ on: push: branches: [main, release/*] pull_request: - branches: [main] + branches: + - "main" + - "release/*" jobs: build: @@ -31,7 +33,6 @@ jobs: - uses: actions/checkout@v3 with: submodules: recursive - token: ${{ secrets.SUBMODULE_CHECKOUT }} - uses: dtolnay/rust-toolchain@stable diff --git a/.github/workflows/dev-debian.yml b/.github/workflows/dev-debian.yml deleted file mode 100644 index acc2994aa3..0000000000 --- a/.github/workflows/dev-debian.yml +++ /dev/null @@ -1,48 +0,0 @@ -name: Dev Debian package - -on: - push: - branches: [main] - pull_request: - branches: [main] - -jobs: - release: - name: build and release - strategy: - matrix: - runner: [ubuntu-latest, ubicloud] - runs-on: ${{ matrix.runner }} - steps: - - name: checkout sources - uses: actions/checkout@v3 - with: - submodules: recursive - token: ${{ secrets.SUBMODULE_CHECKOUT }} - - - name: install system tools - run: | - sudo apt-get update - sudo apt-get install -y musl-tools protobuf-compiler gcc-multilib \ - protobuf-compiler libssl-dev pkg-config build-essential - - - name: install cargo binstall - run: | - curl -L --proto '=https' --tlsv1.2 -sSf\ - https://raw.githubusercontent.com/cargo-bins/cargo-binstall/main/install-from-binstall-release.sh | bash - - - name: install cargo binaries - run: | - cargo binstall --no-confirm --no-symlinks cargo-deb - - - uses: dtolnay/rust-toolchain@stable - with: - targets: x86_64-unknown-linux-musl - - - name: build project release - working-directory: ./nexus - run: cargo build --release --target=x86_64-unknown-linux-musl - - - name: create peerdb-server deb package - working-directory: ./nexus/ - run: cargo deb --target=x86_64-unknown-linux-musl -p peerdb-server --no-build diff --git a/.github/workflows/dev-docker.yml b/.github/workflows/dev-docker.yml index 65e9c7999c..0fd90882a8 100644 --- a/.github/workflows/dev-docker.yml +++ b/.github/workflows/dev-docker.yml @@ -3,14 +3,12 @@ name: Dev Docker Images on: push: branches: [main] - pull_request: - branches: [main] jobs: docker-build: strategy: matrix: - runner: [ubuntu-latest, ubicloud] + runner: [ubicloud] runs-on: ${{ matrix.runner }} permissions: contents: read @@ -20,7 +18,6 @@ jobs: uses: actions/checkout@v3 with: submodules: recursive - token: ${{ secrets.SUBMODULE_CHECKOUT }} - uses: depot/setup-action@v1 diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index 80b5364891..04698de4d1 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -2,7 +2,8 @@ name: Flow build and test on: pull_request: - branches: [main] + branches: + - "main" push: branches: [main] @@ -34,7 +35,7 @@ jobs: - uses: actions/setup-go@v3 with: - go-version: '>=1.19.0' + go-version: ">=1.19.0" - name: install gotestsum run: | @@ -73,7 +74,7 @@ jobs: - name: run tests run: | - gotestsum --format testname -- -p 4 ./... -timeout 1200s + gotestsum --format testname -- -p 4 ./... -timeout 2400s working-directory: ./flow env: AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} diff --git a/.github/workflows/golang-lint.yml b/.github/workflows/golang-lint.yml index 4b1821b7fe..fd685d8c44 100644 --- a/.github/workflows/golang-lint.yml +++ b/.github/workflows/golang-lint.yml @@ -1,6 +1,9 @@ name: GolangCI-Lint -on: [pull_request] +on: + pull_request: + branches: + - "main" jobs: golangci-lint: @@ -10,14 +13,13 @@ jobs: pull-requests: write strategy: matrix: - runner: [ubuntu-latest, ubicloud] + runner: [ubicloud] runs-on: ${{ matrix.runner }} steps: - name: checkout uses: actions/checkout@v3 with: submodules: recursive - token: ${{ secrets.SUBMODULE_CHECKOUT }} - name: golangci-lint uses: reviewdog/action-golangci-lint@v2 diff --git a/.github/workflows/rust-lint.yml b/.github/workflows/rust-lint.yml index 029160274e..01c3dde174 100644 --- a/.github/workflows/rust-lint.yml +++ b/.github/workflows/rust-lint.yml @@ -1,5 +1,10 @@ name: clippy-action -on: [pull_request] + +on: + pull_request: + branches: + - "main" + jobs: clippy: permissions: @@ -8,23 +13,18 @@ jobs: pull-requests: write strategy: matrix: - runner: [ubuntu-latest, ubicloud] + runner: [ubicloud-standard-4] runs-on: ${{ matrix.runner }} steps: - - name: checkout - uses: actions/checkout@v3 - with: - submodules: recursive - token: ${{ secrets.SUBMODULE_CHECKOUT }} + - name: checkout + uses: actions/checkout@v3 + with: + submodules: recursive - - uses: dtolnay/rust-toolchain@stable - with: - components: clippy + - uses: dtolnay/rust-toolchain@stable + with: + components: clippy - - uses: giraffate/clippy-action@v1 - with: - reporter: 'github-pr-review' - github_token: ${{ secrets.GITHUB_TOKEN }} - workdir: ./nexus - env: - REVIEWDOG_TOKEN: ${{ secrets.REVIEWDOG_TOKEN }} + - name: clippy + run: cargo clippy -- -D warnings + working-directory: ./nexus diff --git a/.github/workflows/stable-debian.yml b/.github/workflows/stable-debian.yml deleted file mode 100644 index b58afce10e..0000000000 --- a/.github/workflows/stable-debian.yml +++ /dev/null @@ -1,62 +0,0 @@ -name: Stable Debian package - -on: - push: - tags: - - 'v[0-9]+.[0-9]+.[0-9]+' - -jobs: - release: - name: build and release - strategy: - matrix: - runner: [ubuntu-latest, ubicloud] - runs-on: ${{ matrix.runner }} - permissions: - contents: write - steps: - - name: checkout sources - uses: actions/checkout@v3 - with: - submodules: recursive - token: ${{ secrets.SUBMODULE_CHECKOUT }} - - - name: install system tools - run: | - sudo apt-get update - sudo apt-get install -y musl-tools protobuf-compiler gcc-multilib \ - protobuf-compiler libssl-dev pkg-config build-essential - - - name: install cargo binstall - run: | - curl -L --proto '=https' --tlsv1.2 -sSf\ - https://raw.githubusercontent.com/cargo-bins/cargo-binstall/main/install-from-binstall-release.sh | bash - - - name: install cargo binaries - run: | - cargo binstall --no-confirm --no-symlinks cargo-deb - - - uses: dtolnay/rust-toolchain@stable - with: - targets: x86_64-unknown-linux-musl - - - name: Set Cargo version as Git tag - working-directory: ./nexus/server - run: | - export VERSION=$(echo "${{ github.ref_name }}" | sed 's/^v//') - sed -i "s/0.1.0/$VERSION/g" Cargo.toml - - - name: build project release - working-directory: ./nexus - run: cargo build --release --target=x86_64-unknown-linux-musl - - - name: create peerdb-server deb package - working-directory: ./nexus/ - run: cargo deb --target=x86_64-unknown-linux-musl -p peerdb-server --no-build - - - name: upload release artifacts - uses: softprops/action-gh-release@v1 - if: startsWith(github.ref, 'refs/tags/') - with: - files: | - nexus/target/x86_64-unknown-linux-musl/debian/peerdb-server*.deb diff --git a/.github/workflows/stable-docker.yml b/.github/workflows/stable-docker.yml index 8ca0b96522..ad64b29428 100644 --- a/.github/workflows/stable-docker.yml +++ b/.github/workflows/stable-docker.yml @@ -9,7 +9,7 @@ jobs: docker-build: strategy: matrix: - runner: [ubuntu-latest, ubicloud] + runner: [ubicloud] runs-on: ${{ matrix.runner }} permissions: contents: read @@ -19,7 +19,6 @@ jobs: uses: actions/checkout@v3 with: submodules: recursive - token: ${{ secrets.SUBMODULE_CHECKOUT }} - uses: depot/setup-action@v1 diff --git a/.github/workflows/ui-build.yml b/.github/workflows/ui-build.yml index b1acc0de4f..41a4e33957 100644 --- a/.github/workflows/ui-build.yml +++ b/.github/workflows/ui-build.yml @@ -13,7 +13,7 @@ jobs: name: Build & Test UI strategy: matrix: - runner: [ubuntu-latest, ubicloud] + runner: [ubicloud] runs-on: ${{ matrix.runner }} steps: - name: checkout @@ -25,4 +25,4 @@ jobs: - name: Build working-directory: ui - run: yarn build \ No newline at end of file + run: yarn build diff --git a/.github/workflows/ui-lint.yml b/.github/workflows/ui-lint.yml index d8f2e58538..5c697aa3bf 100644 --- a/.github/workflows/ui-lint.yml +++ b/.github/workflows/ui-lint.yml @@ -17,7 +17,7 @@ jobs: name: Run UI linters strategy: matrix: - runner: [ubuntu-latest, ubicloud] + runner: [ubicloud] runs-on: ${{ matrix.runner }} steps: - name: checkout @@ -33,4 +33,4 @@ jobs: eslint: true prettier: true eslint_dir: ui - prettier_dir: ui \ No newline at end of file + prettier_dir: ui diff --git a/nexus/parser/src/lib.rs b/nexus/parser/src/lib.rs index 18716b66e9..f5b2aac340 100644 --- a/nexus/parser/src/lib.rs +++ b/nexus/parser/src/lib.rs @@ -23,7 +23,7 @@ pub struct NexusQueryParser { pub enum NexusStatement { PeerDDL { stmt: Statement, - ddl: PeerDDL, + ddl: Box, }, PeerQuery { stmt: Statement, @@ -55,7 +55,7 @@ impl NexusStatement { if let Some(ddl) = ddl { return Ok(NexusStatement::PeerDDL { stmt: stmt.clone(), - ddl, + ddl: Box::new(ddl), }); } @@ -100,8 +100,7 @@ impl NexusQueryParser { let peers = tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move { let catalog = self.catalog.lock().await; - let peers = catalog.get_peers().await; - peers + catalog.get_peers().await }) }); diff --git a/nexus/peer-bigquery/src/ast.rs b/nexus/peer-bigquery/src/ast.rs index dd020a6684..075bcc09c5 100644 --- a/nexus/peer-bigquery/src/ast.rs +++ b/nexus/peer-bigquery/src/ast.rs @@ -78,17 +78,19 @@ impl BigqueryAst { visit_function_arg_mut(query, |node| { if let FunctionArgExpr::Expr(arg_expr) = node { - if let Expr::Cast { expr: _, data_type } = arg_expr { - if let DataType::Array(_) = data_type { - let list = self - .flatten_expr_to_in_list(&arg_expr) - .expect("failed to flatten in function"); - let rewritten_array = Array { - elem: list, - named: true, - }; - *node = FunctionArgExpr::Expr(Expr::Array(rewritten_array)); - } + if let Expr::Cast { + expr: _, + data_type: DataType::Array(_), + } = arg_expr + { + let list = self + .flatten_expr_to_in_list(arg_expr) + .expect("failed to flatten in function"); + let rewritten_array = Array { + elem: list, + named: true, + }; + *node = FunctionArgExpr::Expr(Expr::Array(rewritten_array)); } } diff --git a/nexus/peer-bigquery/src/cursor.rs b/nexus/peer-bigquery/src/cursor.rs index ab591dcc33..23812a382a 100644 --- a/nexus/peer-bigquery/src/cursor.rs +++ b/nexus/peer-bigquery/src/cursor.rs @@ -9,7 +9,6 @@ use sqlparser::ast::Statement; use crate::BigQueryQueryExecutor; pub struct BigQueryCursor { - stmt: Statement, position: usize, stream: Mutex, schema: SchemaRef, @@ -42,7 +41,6 @@ impl BigQueryCursorManager { // Create a new cursor let cursor = BigQueryCursor { - stmt: stmt.clone(), position: 0, stream: Mutex::new(stream), schema, diff --git a/nexus/peer-cursor/src/util.rs b/nexus/peer-cursor/src/util.rs index e3e4405aa5..e87478d67b 100644 --- a/nexus/peer-cursor/src/util.rs +++ b/nexus/peer-cursor/src/util.rs @@ -88,7 +88,7 @@ pub fn records_to_query_response<'a>(records: Records) -> PgWireResult> = Arc::new(records.schema.fields.clone()); let schema_copy = pg_schema.clone(); - let data_row_stream = stream::iter(records.records.into_iter()) + let data_row_stream = stream::iter(records.records) .map(move |record| { let mut encoder = DataRowEncoder::new(schema_copy.clone()); for value in record.values.iter() { diff --git a/nexus/peer-snowflake/src/auth.rs b/nexus/peer-snowflake/src/auth.rs index 568c096437..10eb2e32e8 100644 --- a/nexus/peer-snowflake/src/auth.rs +++ b/nexus/peer-snowflake/src/auth.rs @@ -47,9 +47,8 @@ impl SnowflakeAuth { expiry_threshold: u64, ) -> anyhow::Result { let pkey = match password { - Some(pw) => { - DecodePrivateKey::from_pkcs8_encrypted_pem(&private_key, pw).context("Invalid private key or decryption failed")? - }, + Some(pw) => DecodePrivateKey::from_pkcs8_encrypted_pem(&private_key, pw) + .context("Invalid private key or decryption failed")?, None => { DecodePrivateKey::from_pkcs8_pem(&private_key).context("Invalid private key")? } @@ -77,16 +76,15 @@ impl SnowflakeAuth { // Normalize the account identifer to a form that is embedded into the JWT. // Logic adapted from Snowflake's example Python code for key-pair authentication "sql-api-generate-jwt.py". fn normalize_account_identifier(raw_account: &str) -> String { - let split_index: usize; - if !raw_account.contains(".global") { - split_index = *raw_account - .find(".") - .get_or_insert(raw_account.chars().count()); + let split_index = if !raw_account.contains(".global") { + *raw_account + .find('.') + .get_or_insert(raw_account.chars().count()) } else { - split_index = *raw_account - .find("-") - .get_or_insert(raw_account.chars().count()); - } + *raw_account + .find('-') + .get_or_insert(raw_account.chars().count()) + }; raw_account .to_uppercase() .chars() diff --git a/nexus/peer-snowflake/src/cursor.rs b/nexus/peer-snowflake/src/cursor.rs index b1a0ecc9a4..475a2d7f35 100644 --- a/nexus/peer-snowflake/src/cursor.rs +++ b/nexus/peer-snowflake/src/cursor.rs @@ -7,7 +7,6 @@ use sqlparser::ast::Statement; use tokio::sync::Mutex; pub struct SnowflakeCursor { - stmt: Statement, position: usize, stream: Mutex, schema: SchemaRef, @@ -39,7 +38,6 @@ impl SnowflakeCursorManager { // Create a new cursor let cursor = SnowflakeCursor { - stmt: stmt.clone(), position: 0, stream: Mutex::new(stream), schema, diff --git a/nexus/peer-snowflake/src/lib.rs b/nexus/peer-snowflake/src/lib.rs index c2e4fbd9d8..86f7b58544 100644 --- a/nexus/peer-snowflake/src/lib.rs +++ b/nexus/peer-snowflake/src/lib.rs @@ -28,13 +28,13 @@ mod stream; const DEFAULT_REFRESH_THRESHOLD: u64 = 3000; const DEFAULT_EXPIRY_THRESHOLD: u64 = 3600; -const SNOWFLAKE_URL_PREFIX: &'static str = "https://"; -const SNOWFLAKE_URL_SUFFIX: &'static str = ".snowflakecomputing.com/api/v2/statements"; +const SNOWFLAKE_URL_PREFIX: &str = "https://"; +const SNOWFLAKE_URL_SUFFIX: &str = ".snowflakecomputing.com/api/v2/statements"; -const DATE_OUTPUT_FORMAT: &'static str = "YYYY/MM/DD"; -const TIME_OUTPUT_FORMAT: &'static str = "HH:MI:SS.FF"; -const TIMESTAMP_OUTPUT_FORMAT: &'static str = "YYYY-MM-DDTHH24:MI:SS.FF"; -const TIMESTAMP_TZ_OUTPUT_FORMAT: &'static str = "YYYY-MM-DDTHH24:MI:SS.FFTZHTZM"; +const DATE_OUTPUT_FORMAT: &str = "YYYY/MM/DD"; +const TIME_OUTPUT_FORMAT: &str = "HH:MI:SS.FF"; +const TIMESTAMP_OUTPUT_FORMAT: &str = "YYYY-MM-DDTHH24:MI:SS.FF"; +const TIMESTAMP_TZ_OUTPUT_FORMAT: &str = "YYYY-MM-DDTHH24:MI:SS.FFTZHTZM"; #[derive(Debug, Serialize)] struct SQLStatement<'a> { @@ -59,7 +59,7 @@ pub(crate) struct ResultSetRowType { r#type: SnowflakeDataType, } -#[allow(non_snake_case)] +#[allow(non_snake_case, dead_code)] #[derive(Deserialize, Debug)] struct ResultSetPartitionInfo { rowCount: u64, @@ -207,7 +207,7 @@ impl SnowflakeQueryExecutor { }) } - pub async fn query(&self, query: &Box) -> PgWireResult { + pub async fn query(&self, query: &Query) -> PgWireResult { let mut query = query.clone(); let ast = ast::SnowflakeAst::default(); diff --git a/nexus/peer-snowflake/src/stream.rs b/nexus/peer-snowflake/src/stream.rs index b4290707a9..3434b70dfa 100644 --- a/nexus/peer-snowflake/src/stream.rs +++ b/nexus/peer-snowflake/src/stream.rs @@ -13,7 +13,6 @@ use pgwire::{ }; use secrecy::ExposeSecret; use serde::Deserialize; -use serde_json; use std::{ pin::Pin, task::{Context, Poll}, @@ -146,17 +145,21 @@ impl SnowflakeRecordStream { // really hacky workaround for parsing the UTC timezone specifically. SnowflakeDataType::TimestampLtz => { match DateTime::parse_from_str(elem, TIMESTAMP_TZ_PARSE_FORMAT) { - Ok(_) => TimestampWithTimeZone(Utc.from_utc_datetime( - &DateTime::parse_from_str(elem, TIMESTAMP_TZ_PARSE_FORMAT)? + Ok(_) => TimestampWithTimeZone( + Utc.from_utc_datetime( + &DateTime::parse_from_str(elem, TIMESTAMP_TZ_PARSE_FORMAT)? + .naive_utc(), + ), + ), + Err(_) => TimestampWithTimeZone( + Utc.from_utc_datetime( + &DateTime::parse_from_str( + &elem.replace('Z', "+0000"), + TIMESTAMP_TZ_PARSE_FORMAT, + )? .naive_utc(), - )), - Err(_) => TimestampWithTimeZone(Utc.from_utc_datetime( - &DateTime::parse_from_str( - &elem.replace("Z", "+0000"), - TIMESTAMP_TZ_PARSE_FORMAT, - )? - .naive_utc(), - )), + ), + ), } } SnowflakeDataType::TimestampNtz => PostgresTimestamp( @@ -164,21 +167,25 @@ impl SnowflakeRecordStream { ), SnowflakeDataType::TimestampTz => { match DateTime::parse_from_str(elem, TIMESTAMP_TZ_PARSE_FORMAT) { - Ok(_) => TimestampWithTimeZone(Utc.from_utc_datetime( - &DateTime::parse_from_str(elem, TIMESTAMP_TZ_PARSE_FORMAT)? + Ok(_) => TimestampWithTimeZone( + Utc.from_utc_datetime( + &DateTime::parse_from_str(elem, TIMESTAMP_TZ_PARSE_FORMAT)? + .naive_utc(), + ), + ), + Err(_) => TimestampWithTimeZone( + Utc.from_utc_datetime( + &DateTime::parse_from_str( + &elem.replace('Z', "+0000"), + TIMESTAMP_TZ_PARSE_FORMAT, + )? .naive_utc(), - )), - Err(_) => TimestampWithTimeZone(Utc.from_utc_datetime( - &DateTime::parse_from_str( - &elem.replace("Z", "+0000"), - TIMESTAMP_TZ_PARSE_FORMAT, - )? - .naive_utc(), - )), + ), + ), } } SnowflakeDataType::Variant => { - let jsonb: serde_json::Value = serde_json::from_str(&elem)?; + let jsonb: serde_json::Value = serde_json::from_str(elem)?; Value::JsonB(jsonb) } }, @@ -188,7 +195,7 @@ impl SnowflakeRecordStream { row_values.push(row_value.unwrap_or(Value::Null)); } - self.partition_index = self.partition_index + 1; + self.partition_index += 1; Ok(Record { values: row_values, @@ -200,7 +207,7 @@ impl SnowflakeRecordStream { if (self.partition_number + 1) == self.result_set.resultSetMetaData.partitionInfo.len() { return Ok(false); } - self.partition_number = self.partition_number + 1; + self.partition_number += 1; self.partition_index = 0; let partition_number = self.partition_number; let secret = self.auth.get_jwt()?.expose_secret().clone(); diff --git a/nexus/server/src/cursor.rs b/nexus/server/src/cursor.rs index 025fdbcf50..36fee27c3c 100644 --- a/nexus/server/src/cursor.rs +++ b/nexus/server/src/cursor.rs @@ -24,7 +24,7 @@ impl PeerCursors { self.cursors.remove(&name); } - pub fn get_peer(&self, name: &str) -> Option<&Box> { - self.cursors.get(name) + pub fn get_peer(&self, name: &str) -> Option<&Peer> { + self.cursors.get(name).map(|peer| peer.as_ref()) } } diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index bb7ea02f77..20ff95daf3 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -150,20 +150,20 @@ impl NexusBackend { } fn is_peer_validity_supported(peer_type: i32) -> bool { - let unsupported_peer_types = vec![ + let unsupported_peer_types = [ 4, // EVENTHUB 5, // S3 - 7, // EVENTHUBGROUP + 7, ]; !unsupported_peer_types.contains(&peer_type) } async fn check_for_mirror( catalog: &MutexGuard<'_, Catalog>, - flow_name: String, + flow_name: &str, ) -> PgWireResult> { let workflow_details = catalog - .get_workflow_details_for_flow_job(&flow_name) + .get_workflow_details_for_flow_job(flow_name) .await .map_err(|err| { PgWireError::ApiError(Box::new(PgError::Internal { @@ -257,14 +257,14 @@ impl NexusBackend { ) -> PgWireResult>> { let mut peer_holder: Option> = None; match nexus_stmt { - NexusStatement::PeerDDL { stmt: _, ddl } => match ddl { + NexusStatement::PeerDDL { stmt: _, ddl } => match ddl.as_ref() { PeerDDL::CreatePeer { peer, if_not_exists: _, } => { let peer_type = peer.r#type; if Self::is_peer_validity_supported(peer_type) { - self.validate_peer(peer_type, &peer).await.map_err(|e| { + self.validate_peer(peer_type, peer).await.map_err(|e| { PgWireError::UserError(Box::new(ErrorInfo::new( "ERROR".to_owned(), "internal_error".to_owned(), @@ -295,11 +295,10 @@ impl NexusBackend { }))); } let catalog = self.catalog.lock().await; - let mirror_details = - Self::check_for_mirror(&catalog, flow_job.name.clone()).await?; + let mirror_details = Self::check_for_mirror(&catalog, &flow_job.name).await?; if mirror_details.is_none() { catalog - .create_flow_job_entry(&flow_job) + .create_flow_job_entry(flow_job) .await .map_err(|err| { PgWireError::ApiError(Box::new(PgError::Internal { @@ -321,7 +320,7 @@ impl NexusBackend { // make a request to the flow service to start the job. let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; let workflow_id = flow_handler - .start_peer_flow_job(&flow_job, src_peer, dst_peer) + .start_peer_flow_job(flow_job, src_peer, dst_peer) .await .map_err(|err| { PgWireError::ApiError(Box::new(PgError::Internal { @@ -344,7 +343,7 @@ impl NexusBackend { None, ))]) } else { - Self::handle_mirror_existence(if_not_exists, flow_job.name) + Self::handle_mirror_existence(*if_not_exists, flow_job.name.clone()) } } PeerDDL::CreateMirrorForSelect { @@ -360,13 +359,13 @@ impl NexusBackend { { let catalog = self.catalog.lock().await; mirror_details = - Self::check_for_mirror(&catalog, qrep_flow_job.name.clone()).await?; + Self::check_for_mirror(&catalog, &qrep_flow_job.name).await?; } if mirror_details.is_none() { { let catalog = self.catalog.lock().await; catalog - .create_qrep_flow_job_entry(&qrep_flow_job) + .create_qrep_flow_job_entry(qrep_flow_job) .await .map_err(|err| { PgWireError::ApiError(Box::new(PgError::Internal { @@ -387,14 +386,14 @@ impl NexusBackend { ))]); } - let _workflow_id = self.run_qrep_mirror(&qrep_flow_job).await?; + let _workflow_id = self.run_qrep_mirror(qrep_flow_job).await?; let create_mirror_success = format!("CREATE MIRROR {}", qrep_flow_job.name); Ok(vec![Response::Execution(Tag::new_for_execution( &create_mirror_success, None, ))]) } else { - Self::handle_mirror_existence(if_not_exists, qrep_flow_job.name) + Self::handle_mirror_existence(*if_not_exists, qrep_flow_job.name.clone()) } } PeerDDL::ExecuteMirrorForSelect { flow_job_name } => { @@ -407,7 +406,7 @@ impl NexusBackend { if let Some(job) = { let catalog = self.catalog.lock().await; catalog - .get_qrep_flow_job_by_name(&flow_job_name) + .get_qrep_flow_job_by_name(flow_job_name) .await .map_err(|err| { PgWireError::ApiError(Box::new(PgError::Internal { @@ -442,7 +441,7 @@ impl NexusBackend { let catalog = self.catalog.lock().await; tracing::info!("mirror_name: {}, if_exists: {}", flow_job_name, if_exists); let workflow_details = catalog - .get_workflow_details_for_flow_job(&flow_job_name) + .get_workflow_details_for_flow_job(flow_job_name) .await .map_err(|err| { PgWireError::ApiError(Box::new(PgError::Internal { @@ -460,7 +459,7 @@ impl NexusBackend { let workflow_details = workflow_details.unwrap(); let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; flow_handler - .shutdown_flow_job(&flow_job_name, workflow_details) + .shutdown_flow_job(flow_job_name, workflow_details) .await .map_err(|err| { PgWireError::ApiError(Box::new(PgError::Internal { @@ -468,7 +467,7 @@ impl NexusBackend { })) })?; catalog - .delete_flow_job_entry(&flow_job_name) + .delete_flow_job_entry(flow_job_name) .await .map_err(|err| { PgWireError::ApiError(Box::new(PgError::Internal { @@ -480,7 +479,7 @@ impl NexusBackend { &drop_mirror_success, None, ))]) - } else if if_exists { + } else if *if_exists { let no_mirror_success = "NO SUCH MIRROR"; Ok(vec![Response::Execution(Tag::new_for_execution( no_mirror_success, diff --git a/nexus/value/src/array.rs b/nexus/value/src/array.rs index 9b50c8c679..2fa299bb34 100644 --- a/nexus/value/src/array.rs +++ b/nexus/value/src/array.rs @@ -128,7 +128,7 @@ impl ArrayValue { } } -impl<'a> ToSql for ArrayValue { +impl ToSql for ArrayValue { fn to_sql( &self, ty: &Type, @@ -235,7 +235,6 @@ impl ToSqlText for ArrayValue { ArrayValue::Timestamp(arr) => array_to_sql_text!(arr, ty, out), ArrayValue::TimestampWithTimeZone(arr) => array_to_sql_text!(arr, ty, out), ArrayValue::Empty => {} - _ => todo!(), } // remove trailing comma