Skip to content

Commit

Permalink
Merge branch 'main' into remove-bq-lock
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Dec 25, 2023
2 parents 5e9ae02 + 0806d33 commit 9f04fc6
Show file tree
Hide file tree
Showing 19 changed files with 316 additions and 328 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,7 @@ func (c *BigQueryConnector) getUpdateMetadataStmt(jobName string, lastSyncedChec
c.datasetID, MirrorJobsTable, jobName, lastSyncedCheckpointID, batchID)
if hasJob {
jobStatement = fmt.Sprintf(
"UPDATE %s.%s SET offset = %d,sync_batch_id=%d WHERE mirror_job_name = '%s';",
"UPDATE %s.%s SET offset=GREATEST(offset,%d),sync_batch_id=%d WHERE mirror_job_name = '%s';",
c.datasetID, MirrorJobsTable, lastSyncedCheckpointID, batchID, jobName)
}

Expand Down
12 changes: 6 additions & 6 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,10 @@ const (
rawTableMultiValueInsertSQL = "INSERT INTO %s.%s VALUES%s"
createNormalizedTableSQL = "CREATE TABLE IF NOT EXISTS %s(%s)"
toVariantColumnName = "VAR_COLS"
mergeStatementSQL = `MERGE INTO %s TARGET USING (WITH VARIANT_CONVERTED AS (SELECT _PEERDB_UID,
_PEERDB_TIMESTAMP,
TO_VARIANT(PARSE_JSON(_PEERDB_DATA)) %s,_PEERDB_RECORD_TYPE,_PEERDB_MATCH_DATA,_PEERDB_BATCH_ID,
_PEERDB_UNCHANGED_TOAST_COLUMNS FROM
_PEERDB_INTERNAL.%s WHERE _PEERDB_BATCH_ID > %d AND _PEERDB_BATCH_ID <= %d AND
mergeStatementSQL = `MERGE INTO %s TARGET USING (WITH VARIANT_CONVERTED AS (
SELECT _PEERDB_UID,_PEERDB_TIMESTAMP,TO_VARIANT(PARSE_JSON(_PEERDB_DATA)) %s,_PEERDB_RECORD_TYPE,
_PEERDB_MATCH_DATA,_PEERDB_BATCH_ID,_PEERDB_UNCHANGED_TOAST_COLUMNS
FROM _PEERDB_INTERNAL.%s WHERE _PEERDB_BATCH_ID > %d AND _PEERDB_BATCH_ID <= %d AND
_PEERDB_DESTINATION_TABLE_NAME = ? ), FLATTENED AS
(SELECT _PEERDB_UID,_PEERDB_TIMESTAMP,_PEERDB_RECORD_TYPE,_PEERDB_MATCH_DATA,_PEERDB_BATCH_ID,
_PEERDB_UNCHANGED_TOAST_COLUMNS,%s
Expand All @@ -66,7 +65,8 @@ const (

insertJobMetadataSQL = "INSERT INTO %s.%s VALUES (?,?,?,?)"

updateMetadataForSyncRecordsSQL = "UPDATE %s.%s SET OFFSET=?, SYNC_BATCH_ID=? WHERE MIRROR_JOB_NAME=?"
updateMetadataForSyncRecordsSQL = `UPDATE %s.%s SET OFFSET=GREATEST(OFFSET, ?), SYNC_BATCH_ID=?
WHERE MIRROR_JOB_NAME=?`
updateMetadataForNormalizeRecordsSQL = "UPDATE %s.%s SET NORMALIZE_BATCH_ID=? WHERE MIRROR_JOB_NAME=?"

checkIfTableExistsSQL = `SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.TABLES
Expand Down
77 changes: 0 additions & 77 deletions nexus/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 0 additions & 10 deletions nexus/flow-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,8 @@ name = "flow-rs"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_yaml = "0.9"
serde_json = "1.0"
reqwest = { version = "0.11", features = [
"blocking",
"json",
"native-tls-vendored",
] }
anyhow = "1.0"
tracing = "0.1"
tonic = "0.10"
Expand Down
2 changes: 1 addition & 1 deletion nexus/peer-snowflake/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
futures = "0.3"
ureq = { version = "2", features = ["json", "charset"] }
reqwest = { version = "0.11", features = ["json", "gzip"] }
reqwest = { version = "0.11", default-features = false, features = ["json", "gzip", "rustls-tls"] }
anyhow = "1.0"
tokio = { version = "1.21", features = ["full"] }
hex = "0.4"
Expand Down
4 changes: 1 addition & 3 deletions ui/app/login/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ export default function Login() {
const searchParams = useSearchParams();
const [pass, setPass] = useState('');
const [show, setShow] = useState(false);
const [error, setError] = useState(() =>
searchParams.has('reject') ? 'Authentication failed, please login' : ''
);
const [error, setError] = useState(() => '');
const login = () => {
fetch('/api/login', {
method: 'POST',
Expand Down
75 changes: 27 additions & 48 deletions ui/app/mirrors/create/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,45 +73,33 @@ const validateCDCFields = (
}
| undefined
)[],
setMsg: Dispatch<SetStateAction<{ ok: boolean; msg: string }>>,
config: CDCConfig
): boolean => {
): string | undefined => {
let validationErr: string | undefined;
const tablesValidity = tableMappingSchema.safeParse(tableMapping);
if (!tablesValidity.success) {
validationErr = tablesValidity.error.issues[0].message;
setMsg({ ok: false, msg: validationErr });
return false;
}
const configValidity = cdcSchema.safeParse(config);
if (!configValidity.success) {
validationErr = configValidity.error.issues[0].message;
setMsg({ ok: false, msg: validationErr });
return false;
}
setMsg({ ok: true, msg: '' });
return true;
return validationErr;
};

const validateQRepFields = (
query: string,
setMsg: Dispatch<SetStateAction<{ ok: boolean; msg: string }>>,
config: QRepConfig
): boolean => {
): string | undefined => {
if (query.length < 5) {
setMsg({ ok: false, msg: 'Query is invalid' });
return false;
return 'Query is invalid';
}

let validationErr: string | undefined;
const configValidity = qrepSchema.safeParse(config);
if (!configValidity.success) {
validationErr = configValidity.error.issues[0].message;
setMsg({ ok: false, msg: validationErr });
return false;
}
setMsg({ ok: true, msg: '' });
return true;
return validationErr;
};

interface TableMapping {
Expand Down Expand Up @@ -140,25 +128,23 @@ export const handleCreateCDC = async (
flowJobName: string,
rows: TableMapRow[],
config: CDCConfig,
setMsg: Dispatch<
SetStateAction<{
ok: boolean;
msg: string;
}>
>,
notify: (msg: string) => void,
setLoading: Dispatch<SetStateAction<boolean>>,
route: RouteCallback
) => {
const flowNameValid = flowNameSchema.safeParse(flowJobName);
if (!flowNameValid.success) {
const flowNameErr = flowNameValid.error.issues[0].message;
setMsg({ ok: false, msg: flowNameErr });
notify(flowNameErr);
return;
}

const tableNameMapping = reformattedTableMapping(rows);
const isValid = validateCDCFields(tableNameMapping, setMsg, config);
if (!isValid) return;
const fieldErr = validateCDCFields(tableNameMapping, config);
if (fieldErr) {
notify(fieldErr);
return;
}

config['tableMappings'] = tableNameMapping as TableMapping[];
config['flowJobName'] = flowJobName;
Expand All @@ -172,10 +158,7 @@ export const handleCreateCDC = async (
}

if (config.doInitialCopy == false && config.initialCopyOnly == true) {
setMsg({
ok: false,
msg: 'Initial Copy Only cannot be true if Initial Copy is false.',
});
notify('Initial Copy Only cannot be true if Initial Copy is false.');
return;
}

Expand All @@ -187,11 +170,11 @@ export const handleCreateCDC = async (
}),
}).then((res) => res.json());
if (!statusMessage.created) {
setMsg({ ok: false, msg: 'unable to create mirror.' });
notify('unable to create mirror.');
setLoading(false);
return;
}
setMsg({ ok: true, msg: 'CDC Mirror created successfully' });
notify('CDC Mirror created successfully');
route();
setLoading(false);
};
Expand All @@ -209,20 +192,15 @@ export const handleCreateQRep = async (
flowJobName: string,
query: string,
config: QRepConfig,
setMsg: Dispatch<
SetStateAction<{
ok: boolean;
msg: string;
}>
>,
notify: (msg: string) => void,
setLoading: Dispatch<SetStateAction<boolean>>,
route: RouteCallback,
xmin?: boolean
) => {
const flowNameValid = flowNameSchema.safeParse(flowJobName);
if (!flowNameValid.success) {
const flowNameErr = flowNameValid.error.issues[0].message;
setMsg({ ok: false, msg: flowNameErr });
notify(flowNameErr);
return;
}

Expand All @@ -237,16 +215,17 @@ export const handleCreateQRep = async (

if (
config.writeMode?.writeType == QRepWriteType.QREP_WRITE_MODE_UPSERT &&
!config.writeMode?.upsertKeyColumns
(!config.writeMode?.upsertKeyColumns ||
config.writeMode?.upsertKeyColumns.length == 0)
) {
setMsg({
ok: false,
msg: 'For upsert mode, unique key columns cannot be empty.',
});
notify('For upsert mode, unique key columns cannot be empty.');
return;
}
const fieldErr = validateQRepFields(query, config);
if (fieldErr) {
notify(fieldErr);
return;
}
const isValid = validateQRepFields(query, setMsg, config);
if (!isValid) return;
config.flowJobName = flowJobName;
config.query = query;

Expand All @@ -267,11 +246,11 @@ export const handleCreateQRep = async (
}
).then((res) => res.json());
if (!statusMessage.created) {
setMsg({ ok: false, msg: 'unable to create mirror.' });
notify('unable to create mirror.');
setLoading(false);
return;
}
setMsg({ ok: true, msg: 'Query Replication Mirror created successfully' });
notify('Query Replication Mirror created successfully');
route();
setLoading(false);
};
Expand Down
Loading

0 comments on commit 9f04fc6

Please sign in to comment.