diff --git a/flow/generated/protos/flow.pb.go b/flow/generated/protos/flow.pb.go index f08b021610..0b0582120d 100644 --- a/flow/generated/protos/flow.pb.go +++ b/flow/generated/protos/flow.pb.go @@ -2383,6 +2383,8 @@ type QRepConfig struct { // and instead uses the number of rows per partition to determine // how many rows to process per batch. NumRowsPerPartition uint32 `protobuf:"varint,16,opt,name=num_rows_per_partition,json=numRowsPerPartition,proto3" json:"num_rows_per_partition,omitempty"` + // Creates the watermark table on the destination as-is, can be used for some queries. + SetupWatermarkTableOnDestination bool `protobuf:"varint,17,opt,name=setup_watermark_table_on_destination,json=setupWatermarkTableOnDestination,proto3" json:"setup_watermark_table_on_destination,omitempty"` } func (x *QRepConfig) Reset() { @@ -2529,6 +2531,13 @@ func (x *QRepConfig) GetNumRowsPerPartition() uint32 { return 0 } +func (x *QRepConfig) GetSetupWatermarkTableOnDestination() bool { + if x != nil { + return x.SetupWatermarkTableOnDestination + } + return false +} + type QRepPartition struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -3400,7 +3409,7 @@ var file_flow_proto_rawDesc = []byte{ 0x70, 0x65, 0x12, 0x2c, 0x0a, 0x12, 0x75, 0x70, 0x73, 0x65, 0x72, 0x74, 0x5f, 0x6b, 0x65, 0x79, 0x5f, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x10, 0x75, 0x70, 0x73, 0x65, 0x72, 0x74, 0x4b, 0x65, 0x79, 0x43, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, - 0x22, 0x96, 0x06, 0x0a, 0x0a, 0x51, 0x52, 0x65, 0x70, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, + 0x22, 0xe6, 0x06, 0x0a, 0x0a, 0x51, 0x52, 0x65, 0x70, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x22, 0x0a, 0x0d, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x6a, 0x6f, 0x62, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x66, 0x6c, 0x6f, 0x77, 0x4a, 0x6f, 0x62, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x33, 0x0a, 0x0b, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x70, 0x65, @@ -3449,7 +3458,12 @@ var file_flow_proto_rawDesc = []byte{ 0x50, 0x61, 0x74, 0x68, 0x12, 0x33, 0x0a, 0x16, 0x6e, 0x75, 0x6d, 0x5f, 0x72, 0x6f, 0x77, 0x73, 0x5f, 0x70, 0x65, 0x72, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x10, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x13, 0x6e, 0x75, 0x6d, 0x52, 0x6f, 0x77, 0x73, 0x50, 0x65, 0x72, - 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x97, 0x01, 0x0a, 0x0d, 0x51, 0x52, + 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x4e, 0x0a, 0x24, 0x73, 0x65, 0x74, + 0x75, 0x70, 0x5f, 0x77, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, 0x5f, 0x74, 0x61, 0x62, + 0x6c, 0x65, 0x5f, 0x6f, 0x6e, 0x5f, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x18, 0x11, 0x20, 0x01, 0x28, 0x08, 0x52, 0x20, 0x73, 0x65, 0x74, 0x75, 0x70, 0x57, 0x61, + 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x4f, 0x6e, 0x44, 0x65, + 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x97, 0x01, 0x0a, 0x0d, 0x51, 0x52, 0x65, 0x70, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x21, 0x0a, 0x0c, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x31, diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 0141e4f57f..abe1338bf8 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -47,6 +47,46 @@ func (q *QRepFlowExecution) SetupMetadataTables(ctx workflow.Context) error { return nil } +func (q *QRepFlowExecution) SetupWatermarkTableOnDestination(ctx workflow.Context) error { + if q.config.SetupWatermarkTableOnDestination { + q.logger.Info("setting up watermark table on destination for qrep flow: ", q.config.FlowJobName) + + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Minute, + }) + + tableSchemaInput := &protos.GetTableSchemaBatchInput{ + PeerConnectionConfig: q.config.SourcePeer, + TableIdentifiers: []string{q.config.WatermarkTable}, + } + + future := workflow.ExecuteActivity(ctx, flowable.GetTableSchema, tableSchemaInput) + + var tblSchemaOutput *protos.GetTableSchemaBatchOutput + if err := future.Get(ctx, &tblSchemaOutput); err != nil { + q.logger.Error("failed to fetch schema for watermark table: ", err) + return fmt.Errorf("failed to fetch schema for watermark table %s: %w", q.config.WatermarkTable, err) + } + + // now setup the normalized tables on the destination peer + setupConfig := &protos.SetupNormalizedTableBatchInput{ + PeerConnectionConfig: q.config.DestinationPeer, + TableNameSchemaMapping: map[string]*protos.TableSchema{ + q.config.DestinationTableIdentifier: tblSchemaOutput.TableNameSchemaMapping[q.config.WatermarkTable], + }, + } + + future = workflow.ExecuteActivity(ctx, flowable.CreateNormalizedTable, setupConfig) + var createNormalizedTablesOutput *protos.SetupNormalizedTableBatchOutput + if err := future.Get(ctx, &createNormalizedTablesOutput); err != nil { + q.logger.Error("failed to create watermark table: ", err) + return fmt.Errorf("failed to create watermark table: %w", err) + } + q.logger.Info("finished setting up watermark table for qrep flow: ", q.config.FlowJobName) + } + return nil +} + // GetPartitions returns the partitions to replicate. func (q *QRepFlowExecution) GetPartitions( ctx workflow.Context, @@ -271,6 +311,11 @@ func QRepFlowWorkflow( } q.logger.Info("metadata tables setup for peer flow - ", config.FlowJobName) + err = q.SetupWatermarkTableOnDestination(ctx) + if err != nil { + return fmt.Errorf("failed to setup watermark table: %w", err) + } + logger.Info("fetching partitions to replicate for peer flow - ", config.FlowJobName) partitions, err := q.GetPartitions(ctx, lastPartition) if err != nil { diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index 6a727d19f9..34d0e19a21 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -620,9 +620,9 @@ fn parse_db_options( } DbType::S3 => { let s3_conn_str: String = opts - .get("metadata_db") - .map(|s| s.to_string()) - .unwrap_or_default(); + .get("metadata_db") + .map(|s| s.to_string()) + .unwrap_or_default(); let metadata_db = parse_metadata_db_info(&s3_conn_str)?; let s3_config = S3Config { url: opts diff --git a/nexus/analyzer/src/qrep.rs b/nexus/analyzer/src/qrep.rs index e112422032..dda1a92478 100644 --- a/nexus/analyzer/src/qrep.rs +++ b/nexus/analyzer/src/qrep.rs @@ -103,7 +103,11 @@ lazy_static::lazy_static! { default_value: false, required: false, }, - ] + QRepOptionType::Boolean { + name: "setup_watermark_table_on_destination", + default_value: false, + required: false + }] }; } diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index cd8f661b36..a50a489b08 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -283,6 +283,8 @@ impl FlowGrpcClient { Value::Bool(v) => { if key == "initial_copy_only" { cfg.initial_copy_only = *v; + } else if key == "setup_watermark_table_on_destination" { + cfg.setup_watermark_table_on_destination = *v; } else { return anyhow::Result::Err(anyhow::anyhow!("invalid bool option {}", key)); } diff --git a/nexus/peer-bigquery/src/stream.rs b/nexus/peer-bigquery/src/stream.rs index e4c3aca513..fc4867b4f3 100644 --- a/nexus/peer-bigquery/src/stream.rs +++ b/nexus/peer-bigquery/src/stream.rs @@ -149,9 +149,7 @@ impl BqRecordStream { if let Some(ts) = timestamp { let naive_datetime = NaiveDateTime::from_timestamp_opt(ts, 0) .ok_or(anyhow::Error::msg("Invalid naive datetime"))?; - Some(Value::Timestamp(Utc.from_utc_datetime( - &naive_datetime, - ))) + Some(Value::Timestamp(Utc.from_utc_datetime(&naive_datetime))) } else { None } diff --git a/nexus/pt/src/peerdb_flow.rs b/nexus/pt/src/peerdb_flow.rs index 05eaf43a79..ab35dc9efb 100644 --- a/nexus/pt/src/peerdb_flow.rs +++ b/nexus/pt/src/peerdb_flow.rs @@ -413,6 +413,9 @@ pub struct QRepConfig { /// how many rows to process per batch. #[prost(uint32, tag="16")] pub num_rows_per_partition: u32, + /// Creates the watermark table on the destination as-is, can be used for some queries. + #[prost(bool, tag="17")] + pub setup_watermark_table_on_destination: bool, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/nexus/pt/src/peerdb_flow.serde.rs b/nexus/pt/src/peerdb_flow.serde.rs index c69bcdc15e..d0f98e6ff3 100644 --- a/nexus/pt/src/peerdb_flow.serde.rs +++ b/nexus/pt/src/peerdb_flow.serde.rs @@ -2362,6 +2362,9 @@ impl serde::Serialize for QRepConfig { if self.num_rows_per_partition != 0 { len += 1; } + if self.setup_watermark_table_on_destination { + len += 1; + } let mut struct_ser = serializer.serialize_struct("peerdb_flow.QRepConfig", len)?; if !self.flow_job_name.is_empty() { struct_ser.serialize_field("flowJobName", &self.flow_job_name)?; @@ -2413,6 +2416,9 @@ impl serde::Serialize for QRepConfig { if self.num_rows_per_partition != 0 { struct_ser.serialize_field("numRowsPerPartition", &self.num_rows_per_partition)?; } + if self.setup_watermark_table_on_destination { + struct_ser.serialize_field("setupWatermarkTableOnDestination", &self.setup_watermark_table_on_destination)?; + } struct_ser.end() } } @@ -2454,6 +2460,8 @@ impl<'de> serde::Deserialize<'de> for QRepConfig { "stagingPath", "num_rows_per_partition", "numRowsPerPartition", + "setup_watermark_table_on_destination", + "setupWatermarkTableOnDestination", ]; #[allow(clippy::enum_variant_names)] @@ -2474,6 +2482,7 @@ impl<'de> serde::Deserialize<'de> for QRepConfig { WriteMode, StagingPath, NumRowsPerPartition, + SetupWatermarkTableOnDestination, __SkipField__, } impl<'de> serde::Deserialize<'de> for GeneratedField { @@ -2512,6 +2521,7 @@ impl<'de> serde::Deserialize<'de> for QRepConfig { "writeMode" | "write_mode" => Ok(GeneratedField::WriteMode), "stagingPath" | "staging_path" => Ok(GeneratedField::StagingPath), "numRowsPerPartition" | "num_rows_per_partition" => Ok(GeneratedField::NumRowsPerPartition), + "setupWatermarkTableOnDestination" | "setup_watermark_table_on_destination" => Ok(GeneratedField::SetupWatermarkTableOnDestination), _ => Ok(GeneratedField::__SkipField__), } } @@ -2547,6 +2557,7 @@ impl<'de> serde::Deserialize<'de> for QRepConfig { let mut write_mode__ = None; let mut staging_path__ = None; let mut num_rows_per_partition__ = None; + let mut setup_watermark_table_on_destination__ = None; while let Some(k) = map.next_key()? { match k { GeneratedField::FlowJobName => { @@ -2655,6 +2666,12 @@ impl<'de> serde::Deserialize<'de> for QRepConfig { Some(map.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) ; } + GeneratedField::SetupWatermarkTableOnDestination => { + if setup_watermark_table_on_destination__.is_some() { + return Err(serde::de::Error::duplicate_field("setupWatermarkTableOnDestination")); + } + setup_watermark_table_on_destination__ = Some(map.next_value()?); + } GeneratedField::__SkipField__ => { let _ = map.next_value::()?; } @@ -2677,6 +2694,7 @@ impl<'de> serde::Deserialize<'de> for QRepConfig { write_mode: write_mode__, staging_path: staging_path__.unwrap_or_default(), num_rows_per_partition: num_rows_per_partition__.unwrap_or_default(), + setup_watermark_table_on_destination: setup_watermark_table_on_destination__.unwrap_or_default(), }) } } diff --git a/protos/flow.proto b/protos/flow.proto index 86eb36d5a5..1cda641cac 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -279,6 +279,9 @@ message QRepConfig { // and instead uses the number of rows per partition to determine // how many rows to process per batch. uint32 num_rows_per_partition = 16; + + // Creates the watermark table on the destination as-is, can be used for some queries. + bool setup_watermark_table_on_destination = 17; } message QRepPartition { diff --git a/ui/grpc_generated/flow.ts b/ui/grpc_generated/flow.ts index 7faf3c636b..159842ccb2 100644 --- a/ui/grpc_generated/flow.ts +++ b/ui/grpc_generated/flow.ts @@ -386,6 +386,8 @@ export interface QRepConfig { * how many rows to process per batch. */ numRowsPerPartition: number; + /** Creates the watermark table on the destination as-is, can be used for some queries. */ + setupWatermarkTableOnDestination: boolean; } export interface QRepPartition { @@ -4713,6 +4715,7 @@ function createBaseQRepConfig(): QRepConfig { writeMode: undefined, stagingPath: "", numRowsPerPartition: 0, + setupWatermarkTableOnDestination: false, }; } @@ -4766,6 +4769,9 @@ export const QRepConfig = { if (message.numRowsPerPartition !== 0) { writer.uint32(128).uint32(message.numRowsPerPartition); } + if (message.setupWatermarkTableOnDestination === true) { + writer.uint32(136).bool(message.setupWatermarkTableOnDestination); + } return writer; }, @@ -4888,6 +4894,13 @@ export const QRepConfig = { message.numRowsPerPartition = reader.uint32(); continue; + case 17: + if (tag !== 136) { + break; + } + + message.setupWatermarkTableOnDestination = reader.bool(); + continue; } if ((tag & 7) === 4 || tag === 0) { break; @@ -4917,6 +4930,9 @@ export const QRepConfig = { writeMode: isSet(object.writeMode) ? QRepWriteMode.fromJSON(object.writeMode) : undefined, stagingPath: isSet(object.stagingPath) ? String(object.stagingPath) : "", numRowsPerPartition: isSet(object.numRowsPerPartition) ? Number(object.numRowsPerPartition) : 0, + setupWatermarkTableOnDestination: isSet(object.setupWatermarkTableOnDestination) + ? Boolean(object.setupWatermarkTableOnDestination) + : false, }; }, @@ -4970,6 +4986,9 @@ export const QRepConfig = { if (message.numRowsPerPartition !== 0) { obj.numRowsPerPartition = Math.round(message.numRowsPerPartition); } + if (message.setupWatermarkTableOnDestination === true) { + obj.setupWatermarkTableOnDestination = message.setupWatermarkTableOnDestination; + } return obj; }, @@ -5000,6 +5019,7 @@ export const QRepConfig = { : undefined; message.stagingPath = object.stagingPath ?? ""; message.numRowsPerPartition = object.numRowsPerPartition ?? 0; + message.setupWatermarkTableOnDestination = object.setupWatermarkTableOnDestination ?? false; return message; }, };