Skip to content

Commit

Permalink
optionally create watermark table on destination for qrep mirrors (#528)
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Oct 18, 2023
1 parent 17549b2 commit be08323
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 9 deletions.
18 changes: 16 additions & 2 deletions flow/generated/protos/flow.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 45 additions & 0 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,46 @@ func (q *QRepFlowExecution) SetupMetadataTables(ctx workflow.Context) error {
return nil
}

func (q *QRepFlowExecution) SetupWatermarkTableOnDestination(ctx workflow.Context) error {
if q.config.SetupWatermarkTableOnDestination {
q.logger.Info("setting up watermark table on destination for qrep flow: ", q.config.FlowJobName)

ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
})

tableSchemaInput := &protos.GetTableSchemaBatchInput{
PeerConnectionConfig: q.config.SourcePeer,
TableIdentifiers: []string{q.config.WatermarkTable},
}

future := workflow.ExecuteActivity(ctx, flowable.GetTableSchema, tableSchemaInput)

var tblSchemaOutput *protos.GetTableSchemaBatchOutput
if err := future.Get(ctx, &tblSchemaOutput); err != nil {
q.logger.Error("failed to fetch schema for watermark table: ", err)
return fmt.Errorf("failed to fetch schema for watermark table %s: %w", q.config.WatermarkTable, err)
}

// now setup the normalized tables on the destination peer
setupConfig := &protos.SetupNormalizedTableBatchInput{
PeerConnectionConfig: q.config.DestinationPeer,
TableNameSchemaMapping: map[string]*protos.TableSchema{
q.config.DestinationTableIdentifier: tblSchemaOutput.TableNameSchemaMapping[q.config.WatermarkTable],
},
}

future = workflow.ExecuteActivity(ctx, flowable.CreateNormalizedTable, setupConfig)
var createNormalizedTablesOutput *protos.SetupNormalizedTableBatchOutput
if err := future.Get(ctx, &createNormalizedTablesOutput); err != nil {
q.logger.Error("failed to create watermark table: ", err)
return fmt.Errorf("failed to create watermark table: %w", err)
}
q.logger.Info("finished setting up watermark table for qrep flow: ", q.config.FlowJobName)
}
return nil
}

// GetPartitions returns the partitions to replicate.
func (q *QRepFlowExecution) GetPartitions(
ctx workflow.Context,
Expand Down Expand Up @@ -271,6 +311,11 @@ func QRepFlowWorkflow(
}
q.logger.Info("metadata tables setup for peer flow - ", config.FlowJobName)

err = q.SetupWatermarkTableOnDestination(ctx)
if err != nil {
return fmt.Errorf("failed to setup watermark table: %w", err)
}

logger.Info("fetching partitions to replicate for peer flow - ", config.FlowJobName)
partitions, err := q.GetPartitions(ctx, lastPartition)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,9 +620,9 @@ fn parse_db_options(
}
DbType::S3 => {
let s3_conn_str: String = opts
.get("metadata_db")
.map(|s| s.to_string())
.unwrap_or_default();
.get("metadata_db")
.map(|s| s.to_string())
.unwrap_or_default();
let metadata_db = parse_metadata_db_info(&s3_conn_str)?;
let s3_config = S3Config {
url: opts
Expand Down
6 changes: 5 additions & 1 deletion nexus/analyzer/src/qrep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,11 @@ lazy_static::lazy_static! {
default_value: false,
required: false,
},
]
QRepOptionType::Boolean {
name: "setup_watermark_table_on_destination",
default_value: false,
required: false
}]
};
}

Expand Down
2 changes: 2 additions & 0 deletions nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ impl FlowGrpcClient {
Value::Bool(v) => {
if key == "initial_copy_only" {
cfg.initial_copy_only = *v;
} else if key == "setup_watermark_table_on_destination" {
cfg.setup_watermark_table_on_destination = *v;
} else {
return anyhow::Result::Err(anyhow::anyhow!("invalid bool option {}", key));
}
Expand Down
4 changes: 1 addition & 3 deletions nexus/peer-bigquery/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,7 @@ impl BqRecordStream {
if let Some(ts) = timestamp {
let naive_datetime = NaiveDateTime::from_timestamp_opt(ts, 0)
.ok_or(anyhow::Error::msg("Invalid naive datetime"))?;
Some(Value::Timestamp(Utc.from_utc_datetime(
&naive_datetime,
)))
Some(Value::Timestamp(Utc.from_utc_datetime(&naive_datetime)))
} else {
None
}
Expand Down
3 changes: 3 additions & 0 deletions nexus/pt/src/peerdb_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,9 @@ pub struct QRepConfig {
/// how many rows to process per batch.
#[prost(uint32, tag="16")]
pub num_rows_per_partition: u32,
/// Creates the watermark table on the destination as-is, can be used for some queries.
#[prost(bool, tag="17")]
pub setup_watermark_table_on_destination: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
18 changes: 18 additions & 0 deletions nexus/pt/src/peerdb_flow.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2362,6 +2362,9 @@ impl serde::Serialize for QRepConfig {
if self.num_rows_per_partition != 0 {
len += 1;
}
if self.setup_watermark_table_on_destination {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("peerdb_flow.QRepConfig", len)?;
if !self.flow_job_name.is_empty() {
struct_ser.serialize_field("flowJobName", &self.flow_job_name)?;
Expand Down Expand Up @@ -2413,6 +2416,9 @@ impl serde::Serialize for QRepConfig {
if self.num_rows_per_partition != 0 {
struct_ser.serialize_field("numRowsPerPartition", &self.num_rows_per_partition)?;
}
if self.setup_watermark_table_on_destination {
struct_ser.serialize_field("setupWatermarkTableOnDestination", &self.setup_watermark_table_on_destination)?;
}
struct_ser.end()
}
}
Expand Down Expand Up @@ -2454,6 +2460,8 @@ impl<'de> serde::Deserialize<'de> for QRepConfig {
"stagingPath",
"num_rows_per_partition",
"numRowsPerPartition",
"setup_watermark_table_on_destination",
"setupWatermarkTableOnDestination",
];

#[allow(clippy::enum_variant_names)]
Expand All @@ -2474,6 +2482,7 @@ impl<'de> serde::Deserialize<'de> for QRepConfig {
WriteMode,
StagingPath,
NumRowsPerPartition,
SetupWatermarkTableOnDestination,
__SkipField__,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
Expand Down Expand Up @@ -2512,6 +2521,7 @@ impl<'de> serde::Deserialize<'de> for QRepConfig {
"writeMode" | "write_mode" => Ok(GeneratedField::WriteMode),
"stagingPath" | "staging_path" => Ok(GeneratedField::StagingPath),
"numRowsPerPartition" | "num_rows_per_partition" => Ok(GeneratedField::NumRowsPerPartition),
"setupWatermarkTableOnDestination" | "setup_watermark_table_on_destination" => Ok(GeneratedField::SetupWatermarkTableOnDestination),
_ => Ok(GeneratedField::__SkipField__),
}
}
Expand Down Expand Up @@ -2547,6 +2557,7 @@ impl<'de> serde::Deserialize<'de> for QRepConfig {
let mut write_mode__ = None;
let mut staging_path__ = None;
let mut num_rows_per_partition__ = None;
let mut setup_watermark_table_on_destination__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::FlowJobName => {
Expand Down Expand Up @@ -2655,6 +2666,12 @@ impl<'de> serde::Deserialize<'de> for QRepConfig {
Some(map.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
;
}
GeneratedField::SetupWatermarkTableOnDestination => {
if setup_watermark_table_on_destination__.is_some() {
return Err(serde::de::Error::duplicate_field("setupWatermarkTableOnDestination"));
}
setup_watermark_table_on_destination__ = Some(map.next_value()?);
}
GeneratedField::__SkipField__ => {
let _ = map.next_value::<serde::de::IgnoredAny>()?;
}
Expand All @@ -2677,6 +2694,7 @@ impl<'de> serde::Deserialize<'de> for QRepConfig {
write_mode: write_mode__,
staging_path: staging_path__.unwrap_or_default(),
num_rows_per_partition: num_rows_per_partition__.unwrap_or_default(),
setup_watermark_table_on_destination: setup_watermark_table_on_destination__.unwrap_or_default(),
})
}
}
Expand Down
3 changes: 3 additions & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,9 @@ message QRepConfig {
// and instead uses the number of rows per partition to determine
// how many rows to process per batch.
uint32 num_rows_per_partition = 16;

// Creates the watermark table on the destination as-is, can be used for some queries.
bool setup_watermark_table_on_destination = 17;
}

message QRepPartition {
Expand Down
20 changes: 20 additions & 0 deletions ui/grpc_generated/flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,8 @@ export interface QRepConfig {
* how many rows to process per batch.
*/
numRowsPerPartition: number;
/** Creates the watermark table on the destination as-is, can be used for some queries. */
setupWatermarkTableOnDestination: boolean;
}

export interface QRepPartition {
Expand Down Expand Up @@ -4713,6 +4715,7 @@ function createBaseQRepConfig(): QRepConfig {
writeMode: undefined,
stagingPath: "",
numRowsPerPartition: 0,
setupWatermarkTableOnDestination: false,
};
}

Expand Down Expand Up @@ -4766,6 +4769,9 @@ export const QRepConfig = {
if (message.numRowsPerPartition !== 0) {
writer.uint32(128).uint32(message.numRowsPerPartition);
}
if (message.setupWatermarkTableOnDestination === true) {
writer.uint32(136).bool(message.setupWatermarkTableOnDestination);
}
return writer;
},

Expand Down Expand Up @@ -4888,6 +4894,13 @@ export const QRepConfig = {

message.numRowsPerPartition = reader.uint32();
continue;
case 17:
if (tag !== 136) {
break;
}

message.setupWatermarkTableOnDestination = reader.bool();
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
Expand Down Expand Up @@ -4917,6 +4930,9 @@ export const QRepConfig = {
writeMode: isSet(object.writeMode) ? QRepWriteMode.fromJSON(object.writeMode) : undefined,
stagingPath: isSet(object.stagingPath) ? String(object.stagingPath) : "",
numRowsPerPartition: isSet(object.numRowsPerPartition) ? Number(object.numRowsPerPartition) : 0,
setupWatermarkTableOnDestination: isSet(object.setupWatermarkTableOnDestination)
? Boolean(object.setupWatermarkTableOnDestination)
: false,
};
},

Expand Down Expand Up @@ -4970,6 +4986,9 @@ export const QRepConfig = {
if (message.numRowsPerPartition !== 0) {
obj.numRowsPerPartition = Math.round(message.numRowsPerPartition);
}
if (message.setupWatermarkTableOnDestination === true) {
obj.setupWatermarkTableOnDestination = message.setupWatermarkTableOnDestination;
}
return obj;
},

Expand Down Expand Up @@ -5000,6 +5019,7 @@ export const QRepConfig = {
: undefined;
message.stagingPath = object.stagingPath ?? "";
message.numRowsPerPartition = object.numRowsPerPartition ?? 0;
message.setupWatermarkTableOnDestination = object.setupWatermarkTableOnDestination ?? false;
return message;
},
};
Expand Down

0 comments on commit be08323

Please sign in to comment.