Skip to content

Commit

Permalink
RESYNC MIRROR for QRep Snowflake mirrors
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Nov 6, 2023
1 parent 368fb9d commit 445fdce
Show file tree
Hide file tree
Showing 6 changed files with 265 additions and 131 deletions.
2 changes: 1 addition & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context, config *protos.

func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context,
config *protos.QRepConfig, last *protos.QRepPartition) error {
if config.SourcePeer.Type != protos.DBType_POSTGRES {
if config.SourcePeer.Type != protos.DBType_POSTGRES || last.Range == nil {
return nil
}
waitBetweenBatches := 5 * time.Second
Expand Down
2 changes: 1 addition & 1 deletion flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (q *QRepFlowExecution) handleTableRenameForResync(ctx workflow.Context, sta
}
q.config.DestinationTableIdentifier = oldTableIdentifier
}
state.NeedsResync = true
state.NeedsResync = false
return nil
}

Expand Down
26 changes: 26 additions & 0 deletions nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ pub enum PeerDDL {
if_exists: bool,
flow_job_name: String,
},
ResyncMirror {
if_exists: bool,
mirror_name: String,
query_string: Option<String>,
},
}

impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> {
Expand Down Expand Up @@ -378,6 +383,27 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> {
if_exists: *if_exists,
peer_name: peer_name.to_string().to_lowercase(),
})),
Statement::ResyncMirror {
if_exists,
mirror_name,
with_options,
} => {
let mut raw_options = HashMap::new();
for option in with_options {
raw_options.insert(&option.name.value as &str, &option.value);
}

let query_string = match raw_options.remove("query_string") {
Some(sqlparser::ast::Value::SingleQuotedString(s)) => Some(s.clone()),
_ => None,
};

Ok(Some(PeerDDL::ResyncMirror {
if_exists: *if_exists,
mirror_name: mirror_name.to_string().to_lowercase(),
query_string,
}))
}
_ => Ok(None),
}
}
Expand Down
4 changes: 2 additions & 2 deletions nexus/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ impl Catalog {
Ok(table_identifier_parts.join("."))
}

pub async fn create_flow_job_entry(&self, job: &FlowJob) -> anyhow::Result<()> {
pub async fn create_cdc_flow_job_entry(&self, job: &FlowJob) -> anyhow::Result<()> {
let source_peer_id = self
.get_peer_id_i32(&job.source_peer)
.await
Expand Down Expand Up @@ -424,7 +424,7 @@ impl Catalog {
.prepare_typed("SELECT f.*, sp.name as source_peer_name, dp.name as destination_peer_name FROM flows as f
INNER JOIN peers as sp ON f.source_peer = sp.id
INNER JOIN peers as dp ON f.destination_peer = dp.id
WHERE f.name = $1", &[types::Type::TEXT])
WHERE f.name = $1 AND f.query_string IS NOT NULL", &[types::Type::TEXT])
.await?;

let job = self.pg.query_opt(&stmt, &[&job_name]).await?.map(|row| {
Expand Down
Loading

0 comments on commit 445fdce

Please sign in to comment.