Skip to content

Commit fb90f2a

Browse files
authored
RESYNC MIRROR for QRep Snowflake mirrors (#618)
1 parent 75519d9 commit fb90f2a

File tree

6 files changed

+265
-130
lines changed

6 files changed

+265
-130
lines changed

flow/activities/flowable.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -680,7 +680,7 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context, config *protos.
680680

681681
func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context,
682682
config *protos.QRepConfig, last *protos.QRepPartition) error {
683-
if config.SourcePeer.Type != protos.DBType_POSTGRES {
683+
if config.SourcePeer.Type != protos.DBType_POSTGRES || last.Range == nil {
684684
return nil
685685
}
686686
waitBetweenBatches := 5 * time.Second

flow/workflows/qrep_flow.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ func (q *QRepFlowExecution) handleTableRenameForResync(ctx workflow.Context, sta
313313
}
314314
q.config.DestinationTableIdentifier = oldTableIdentifier
315315
}
316-
state.NeedsResync = true
316+
state.NeedsResync = false
317317
return nil
318318
}
319319

nexus/analyzer/src/lib.rs

+26
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,11 @@ pub enum PeerDDL {
123123
if_exists: bool,
124124
flow_job_name: String,
125125
},
126+
ResyncMirror {
127+
if_exists: bool,
128+
mirror_name: String,
129+
query_string: Option<String>,
130+
},
126131
}
127132

128133
impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> {
@@ -378,6 +383,27 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> {
378383
if_exists: *if_exists,
379384
peer_name: peer_name.to_string().to_lowercase(),
380385
})),
386+
Statement::ResyncMirror {
387+
if_exists,
388+
mirror_name,
389+
with_options,
390+
} => {
391+
let mut raw_options = HashMap::new();
392+
for option in with_options {
393+
raw_options.insert(&option.name.value as &str, &option.value);
394+
}
395+
396+
let query_string = match raw_options.remove("query_string") {
397+
Some(sqlparser::ast::Value::SingleQuotedString(s)) => Some(s.clone()),
398+
_ => None,
399+
};
400+
401+
Ok(Some(PeerDDL::ResyncMirror {
402+
if_exists: *if_exists,
403+
mirror_name: mirror_name.to_string().to_lowercase(),
404+
query_string,
405+
}))
406+
}
381407
_ => Ok(None),
382408
}
383409
}

nexus/catalog/src/lib.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,7 @@ impl Catalog {
365365
Ok(table_identifier_parts.join("."))
366366
}
367367

368-
pub async fn create_flow_job_entry(&self, job: &FlowJob) -> anyhow::Result<()> {
368+
pub async fn create_cdc_flow_job_entry(&self, job: &FlowJob) -> anyhow::Result<()> {
369369
let source_peer_id = self
370370
.get_peer_id_i32(&job.source_peer)
371371
.await
@@ -424,7 +424,7 @@ impl Catalog {
424424
.prepare_typed("SELECT f.*, sp.name as source_peer_name, dp.name as destination_peer_name FROM flows as f
425425
INNER JOIN peers as sp ON f.source_peer = sp.id
426426
INNER JOIN peers as dp ON f.destination_peer = dp.id
427-
WHERE f.name = $1", &[types::Type::TEXT])
427+
WHERE f.name = $1 AND f.query_string IS NOT NULL", &[types::Type::TEXT])
428428
.await?;
429429

430430
let job = self.pg.query_opt(&stmt, &[&job_name]).await?.map(|row| {

0 commit comments

Comments
 (0)