From 2de985b81c763feee3c518f061ec5061699eee6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 22 Nov 2023 19:13:22 +0000 Subject: [PATCH 1/2] nexus cleanup (#702) 1. fix typo: QueryAssocation to QueryAssociation 2. remove unnecessary allocations for two-item keys_to_ignore 3. make NexusQueryParser get_peers_bridge/parse_simple_sql async; all callers are async 4. use with_capacity whenever straightforward to do so 5. change peer-snowflake SQLStatement parameters to struct since it serializes to a constant object --- nexus/analyzer/src/lib.rs | 15 ++++------- nexus/catalog/src/lib.rs | 2 +- nexus/parser/src/lib.rs | 20 ++++++-------- nexus/peer-bigquery/src/stream.rs | 2 +- nexus/peer-snowflake/src/lib.rs | 43 +++++++++++++------------------ nexus/server/src/cursor.rs | 4 +-- nexus/server/src/main.rs | 16 ++++++------ nexus/value/src/lib.rs | 4 +-- 8 files changed, 45 insertions(+), 61 deletions(-) diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index 0824abe2f2..2e64012a35 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -40,13 +40,13 @@ impl<'a> PeerExistanceAnalyzer<'a> { } #[derive(Debug, Clone)] -pub enum QueryAssocation { +pub enum QueryAssociation { Peer(Box), Catalog, } impl<'a> StatementAnalyzer for PeerExistanceAnalyzer<'a> { - type Output = QueryAssocation; + type Output = QueryAssociation; fn analyze(&self, statement: &Statement) -> anyhow::Result { let mut peers_touched: HashSet = HashSet::new(); @@ -78,9 +78,9 @@ impl<'a> StatementAnalyzer for PeerExistanceAnalyzer<'a> { anyhow::bail!("queries touching multiple peers are not supported") } else if let Some(peer_name) = peers_touched.iter().next() { let peer = self.peers.get(peer_name).unwrap(); - Ok(QueryAssocation::Peer(Box::new(peer.clone()))) + Ok(QueryAssociation::Peer(Box::new(peer.clone()))) } else { - Ok(QueryAssocation::Catalog) + Ok(QueryAssociation::Catalog) } } } @@ -785,14 +785,9 @@ fn parse_db_options( }) .unwrap_or_default(); - let keys_to_ignore: HashSet = vec!["metadata_db", "unnest_columns"] - .into_iter() - .map(|s| s.to_string()) - .collect(); - let mut eventhubs: HashMap = HashMap::new(); for (key, _) in opts { - if keys_to_ignore.contains(key) { + if matches!(key, "metadata_db" | "unnest_columns") { continue; } diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index 15fccf2233..c1851432e0 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -211,7 +211,7 @@ impl Catalog { let rows = self.pg.query(&stmt, &[]).await?; - let mut peers = HashMap::new(); + let mut peers = HashMap::with_capacity(rows.len()); for row in rows { let name: &str = row.get(1); diff --git a/nexus/parser/src/lib.rs b/nexus/parser/src/lib.rs index f5b2aac340..02c8cb5a27 100644 --- a/nexus/parser/src/lib.rs +++ b/nexus/parser/src/lib.rs @@ -2,7 +2,7 @@ use std::{collections::HashMap, sync::Arc}; use analyzer::{ CursorEvent, PeerCursorAnalyzer, PeerDDL, PeerDDLAnalyzer, PeerExistanceAnalyzer, - QueryAssocation, StatementAnalyzer, + QueryAssociation, StatementAnalyzer, }; use async_trait::async_trait; use catalog::Catalog; @@ -27,7 +27,7 @@ pub enum NexusStatement { }, PeerQuery { stmt: Statement, - assoc: QueryAssocation, + assoc: QueryAssociation, }, PeerCursor { stmt: Statement, @@ -96,13 +96,9 @@ impl NexusQueryParser { Self { catalog } } - pub fn get_peers_bridge(&self) -> PgWireResult> { - let peers = tokio::task::block_in_place(move || { - tokio::runtime::Handle::current().block_on(async move { - let catalog = self.catalog.lock().await; - catalog.get_peers().await - }) - }); + pub async fn get_peers_bridge(&self) -> PgWireResult> { + let catalog = self.catalog.lock().await; + let peers = catalog.get_peers().await; peers.map_err(|e| { PgWireError::UserError(Box::new(ErrorInfo::new( @@ -113,7 +109,7 @@ impl NexusQueryParser { }) } - pub fn parse_simple_sql(&self, sql: &str) -> PgWireResult { + pub async fn parse_simple_sql(&self, sql: &str) -> PgWireResult { let mut stmts = Parser::parse_sql(&DIALECT, sql).map_err(|e| PgWireError::ApiError(Box::new(e)))?; if stmts.len() > 1 { @@ -131,7 +127,7 @@ impl NexusQueryParser { }) } else { let stmt = stmts.remove(0); - let peers = self.get_peers_bridge()?; + let peers = self.get_peers_bridge().await?; let nexus_stmt = NexusStatement::new(peers, &stmt)?; Ok(NexusParsedStatement { statement: nexus_stmt, @@ -162,7 +158,7 @@ impl QueryParser for NexusQueryParser { }) } else { let stmt = stmts.remove(0); - let peers = self.get_peers_bridge()?; + let peers = self.get_peers_bridge().await?; let nexus_stmt = NexusStatement::new(peers, &stmt)?; Ok(NexusParsedStatement { statement: nexus_stmt, diff --git a/nexus/peer-bigquery/src/stream.rs b/nexus/peer-bigquery/src/stream.rs index d4acce7fd0..02851ae2cd 100644 --- a/nexus/peer-bigquery/src/stream.rs +++ b/nexus/peer-bigquery/src/stream.rs @@ -101,7 +101,7 @@ impl BqRecordStream { } pub fn convert_result_set_item(&self, result_set: &ResultSet) -> anyhow::Result { - let mut values = Vec::new(); + let mut values = Vec::with_capacity(self.schema.fields.len()); for field in &self.schema.fields { let field_type = &field.r#type; let field_name = &field.name; diff --git a/nexus/peer-snowflake/src/lib.rs b/nexus/peer-snowflake/src/lib.rs index 7bb5b18790..a4eeeacb91 100644 --- a/nexus/peer-snowflake/src/lib.rs +++ b/nexus/peer-snowflake/src/lib.rs @@ -7,7 +7,7 @@ use pgwire::error::{ErrorInfo, PgWireError, PgWireResult}; use sqlparser::dialect::GenericDialect; use sqlparser::parser; use std::cmp::min; -use std::{collections::HashMap, time::Duration}; +use std::time::Duration; use stream::SnowflakeDataType; use auth::SnowflakeAuth; @@ -36,6 +36,15 @@ const TIME_OUTPUT_FORMAT: &str = "HH:MI:SS.FF"; const TIMESTAMP_OUTPUT_FORMAT: &str = "YYYY-MM-DDTHH24:MI:SS.FF"; const TIMESTAMP_TZ_OUTPUT_FORMAT: &str = "YYYY-MM-DDTHH24:MI:SS.FFTZHTZM"; +#[derive(Debug, Serialize)] +struct SQLStatementParameters<'a> { + pub date_output_format: &'a str, + pub time_output_format: &'a str, + pub timestamp_ltz_output_format: &'a str, + pub timestamp_ntz_output_format: &'a str, + pub timestamp_tz_output_format: &'a str, +} + #[derive(Debug, Serialize)] struct SQLStatement<'a> { statement: &'a str, @@ -43,7 +52,7 @@ struct SQLStatement<'a> { database: &'a str, warehouse: &'a str, role: &'a str, - parameters: HashMap, + parameters: SQLStatementParameters<'a>, } #[allow(non_snake_case)] @@ -147,28 +156,6 @@ impl SnowflakeQueryExecutor { #[async_recursion] #[tracing::instrument(name = "peer_sflake::process_query", skip_all)] async fn process_query(&self, query_str: &str) -> anyhow::Result { - let mut parameters = HashMap::new(); - parameters.insert( - "date_output_format".to_string(), - DATE_OUTPUT_FORMAT.to_string(), - ); - parameters.insert( - "time_output_format".to_string(), - TIME_OUTPUT_FORMAT.to_string(), - ); - parameters.insert( - "timestamp_ltz_output_format".to_string(), - TIMESTAMP_TZ_OUTPUT_FORMAT.to_string(), - ); - parameters.insert( - "timestamp_ntz_output_format".to_string(), - TIMESTAMP_OUTPUT_FORMAT.to_string(), - ); - parameters.insert( - "timestamp_tz_output_format".to_string(), - TIMESTAMP_TZ_OUTPUT_FORMAT.to_string(), - ); - let mut auth = self.auth.clone(); let jwt = auth.get_jwt()?; let secret = jwt.expose_secret().clone(); @@ -186,7 +173,13 @@ impl SnowflakeQueryExecutor { database: &self.config.database, warehouse: &self.config.warehouse, role: &self.config.role, - parameters, + parameters: SQLStatementParameters { + date_output_format: DATE_OUTPUT_FORMAT, + time_output_format: TIME_OUTPUT_FORMAT, + timestamp_ltz_output_format: TIMESTAMP_TZ_OUTPUT_FORMAT, + timestamp_ntz_output_format: TIMESTAMP_OUTPUT_FORMAT, + timestamp_tz_output_format: TIMESTAMP_TZ_OUTPUT_FORMAT, + }, }) .send() .await diff --git a/nexus/server/src/cursor.rs b/nexus/server/src/cursor.rs index 36fee27c3c..58d0e6a0c0 100644 --- a/nexus/server/src/cursor.rs +++ b/nexus/server/src/cursor.rs @@ -20,8 +20,8 @@ impl PeerCursors { self.cursors.insert(name, peer); } - pub fn remove_cursor(&mut self, name: String) { - self.cursors.remove(&name); + pub fn remove_cursor(&mut self, name: &str) { + self.cursors.remove(name); } pub fn get_peer(&self, name: &str) -> Option<&Peer> { diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index 1e38b2e979..c218f9f0c7 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -4,7 +4,7 @@ use std::{ time::Duration, }; -use analyzer::{PeerDDL, QueryAssocation}; +use analyzer::{PeerDDL, QueryAssociation}; use async_trait::async_trait; use bytes::{BufMut, BytesMut}; use catalog::{Catalog, CatalogConfig, WorkflowDetails}; @@ -141,7 +141,7 @@ impl NexusBackend { } peer_cursor::CursorModification::Closed(cursors) => { for cursor_name in cursors { - peer_cursors.remove_cursor(cursor_name); + peer_cursors.remove_cursor(&cursor_name); } Ok(vec![Response::Execution(Tag::new_for_execution( "CLOSE CURSOR", @@ -826,7 +826,7 @@ impl NexusBackend { NexusStatement::PeerQuery { stmt, assoc } => { // get the query executor let executor = match assoc { - QueryAssocation::Peer(peer) => { + QueryAssociation::Peer(peer) => { tracing::info!("handling peer[{}] query: {}", peer.name, stmt); peer_holder = Some(peer.clone()); self.get_peer_executor(&peer).await.map_err(|err| { @@ -835,7 +835,7 @@ impl NexusBackend { })) })? } - QueryAssocation::Catalog => { + QueryAssociation::Catalog => { tracing::info!("handling catalog query: {}", stmt); let catalog = self.catalog.lock().await; catalog.get_executor() @@ -961,7 +961,7 @@ impl SimpleQueryHandler for NexusBackend { where C: ClientInfo + Unpin + Send + Sync, { - let parsed = self.query_parser.parse_simple_sql(sql)?; + let parsed = self.query_parser.parse_simple_sql(sql).await?; let nexus_stmt = parsed.statement; self.handle_query(nexus_stmt).await } @@ -1039,7 +1039,7 @@ impl ExtendedQueryHandler for NexusBackend { sql = sql.replace(&format!("${}", i + 1), ¶meter_to_string(portal, i)?); } - let parsed = self.query_parser.parse_simple_sql(&sql)?; + let parsed = self.query_parser.parse_simple_sql(&sql).await?; let nexus_stmt = parsed.statement; let result = self.handle_query(nexus_stmt).await?; if result.is_empty() { @@ -1077,7 +1077,7 @@ impl ExtendedQueryHandler for NexusBackend { NexusStatement::Empty => Ok(DescribeResponse::no_data()), NexusStatement::PeerQuery { stmt, assoc } => { let schema: Option = match assoc { - QueryAssocation::Peer(peer) => { + QueryAssociation::Peer(peer) => { // if the peer is of type bigquery, let us route the query to bq. match &peer.config { Some(Config::BigqueryConfig(_)) => { @@ -1124,7 +1124,7 @@ impl ExtendedQueryHandler for NexusBackend { } } } - QueryAssocation::Catalog => { + QueryAssociation::Catalog => { let catalog = self.catalog.lock().await; let executor = catalog.get_executor(); executor.describe(stmt).await? diff --git a/nexus/value/src/lib.rs b/nexus/value/src/lib.rs index f6dbe0687b..3e6e0a1cbd 100644 --- a/nexus/value/src/lib.rs +++ b/nexus/value/src/lib.rs @@ -203,7 +203,7 @@ impl Value { } } serde_json::Value::Object(map) => { - let mut hstore = HashMap::new(); + let mut hstore = HashMap::with_capacity(map.len()); for (key, value) in map { hstore.insert(key.clone(), value.to_string()); } @@ -253,7 +253,7 @@ impl Value { Value::Uuid(u) => serde_json::Value::String(u.to_string()), Value::Enum(s) => serde_json::Value::String(s.clone()), Value::Hstore(map) => { - let mut object = serde_json::Map::new(); + let mut object = serde_json::Map::with_capacity(map.len()); for (key, value) in map { object.insert(key.clone(), serde_json::Value::String(value.clone())); } From 7bf1f187e866609f4cb46b5b2a257cd7cee999a1 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Thu, 23 Nov 2023 19:31:45 +0530 Subject: [PATCH 2/2] New Graph UI (#684) A proposal from my end for a more polished UI for our Sync History. Screenshot 2023-11-19 at 10 07 37 AM - Hovering on the bars reveals a proper tooltip box with rows synced and at what time Screenshot 2023-11-19 at 10 31 57 AM --- .../[mirrorId]/aggregatedCountsByInterval.ts | 6 ++- ui/app/mirrors/edit/[mirrorId]/cdcGraph.tsx | 47 ++++--------------- 2 files changed, 15 insertions(+), 38 deletions(-) diff --git a/ui/app/mirrors/edit/[mirrorId]/aggregatedCountsByInterval.ts b/ui/app/mirrors/edit/[mirrorId]/aggregatedCountsByInterval.ts index b2fdf7b1fb..b9b3216625 100644 --- a/ui/app/mirrors/edit/[mirrorId]/aggregatedCountsByInterval.ts +++ b/ui/app/mirrors/edit/[mirrorId]/aggregatedCountsByInterval.ts @@ -36,7 +36,7 @@ function aggregateCountsByInterval( // Iterate through the timestamps and populate the aggregatedCounts object for (let { timestamp, count } of timestamps) { - const date = roundUpToNearestNMinutes(timestamp, 15); + const date = roundUpToNearestNMinutes(timestamp, 1); const formattedTimestamp = moment(date).format(timeUnit); if (!aggregatedCounts[formattedTimestamp]) { @@ -64,6 +64,10 @@ function aggregateCountsByInterval( currentTimestamp.setHours(currentTimestamp.getHours() - 1); } else if (interval === '15min') { currentTimestamp.setMinutes(currentTimestamp.getMinutes() - 15); + } else if (interval === '1min') { + currentTimestamp.setMinutes(currentTimestamp.getMinutes() - 1); + } else if (interval === '5min') { + currentTimestamp.setMinutes(currentTimestamp.getMinutes() - 5); } else if (interval === 'month') { currentTimestamp.setMonth(currentTimestamp.getMonth() - 1); } else if (interval === 'day') { diff --git a/ui/app/mirrors/edit/[mirrorId]/cdcGraph.tsx b/ui/app/mirrors/edit/[mirrorId]/cdcGraph.tsx index cdc6d91a37..951baeaa3c 100644 --- a/ui/app/mirrors/edit/[mirrorId]/cdcGraph.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/cdcGraph.tsx @@ -1,5 +1,6 @@ 'use client'; import { Label } from '@/lib/Label'; +import { BarChart } from '@tremor/react'; import moment from 'moment'; import { useEffect, useState } from 'react'; import ReactSelect from 'react-select'; @@ -51,15 +52,15 @@ function CdcGraph({ syncs }: { syncs: SyncStatusRow[] }) {
-
- {counts.map((count, i) => ( - - ))} -
+ ({ + name: formatGraphLabel(new Date(count[0]), aggregateType), + 'Rows synced at a point in time': count[1], + }))} + index='name' + categories={['Rows synced at a point in time']} + /> ); } @@ -80,32 +81,4 @@ function formatGraphLabel(date: Date, aggregateType: String): string { } } -type GraphBarProps = { - count: number; - label: string; -}; - -function GraphBar({ label, count }: GraphBarProps) { - let color = - count && count > 0 ? 'bg-positive-fill-normal' : 'bg-base-border-subtle'; - let classNames = `relative w-10 h-24 rounded ${color}`; - return ( -
-
-
-
{label}
-
{numberWithCommas(count)}
-
-
-
- ); -} - -function numberWithCommas(x: number): string { - return x.toString().replace(/\B(?=(\d{3})+(?!\d))/g, ','); -} - export default CdcGraph;