Skip to content

Commit

Permalink
feat: opti init meta, close #43
Browse files Browse the repository at this point in the history
  • Loading branch information
wentaojin committed Aug 9, 2024
1 parent cc58d1f commit 1e82a7d
Show file tree
Hide file tree
Showing 41 changed files with 1,213 additions and 919 deletions.
1 change: 1 addition & 0 deletions component/cli/migrate/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type CsvConfig struct {
type CsvMigrateParam struct {
TableThread uint64 `toml:"table-thread" json:"tableThread"`
BatchSize uint64 `toml:"batch-size" json:"batchSize"`
WriteThread uint64 `toml:"write-thread" json:"writeThread"`
DiskUsageFactor string `toml:"disk-usage-factor" json:"diskUsageFactor"`
Header bool `toml:"header" json:"header"`
Separator string `toml:"separator" json:"separator"`
Expand Down
1 change: 1 addition & 0 deletions component/cli/migrate/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type DataScanRule struct {

type DataScanParam struct {
TableThread uint64 `toml:"table-thread" json:"tableThread"`
WriteThread uint64 `toml:"write-thread" json:"writeThread"`
BatchSize uint64 `toml:"batch-size" json:"batchSize"`
ChunkSize uint64 `toml:"chunk-size" json:"chunkSize"`
SqlThreadS uint64 `toml:"sql-thread-s" json:"sqlThreadS"`
Expand Down
1 change: 1 addition & 0 deletions component/cli/migrate/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type SqlConfig struct {

type SqlMigrateParam struct {
BatchSize uint64 `toml:"batch-size" json:"batchSize"`
WriteThread uint64 `toml:"write-thread" json:"writeThread"`
SqlThreadS uint64 `toml:"sql-thread-s" json:"sqlThreadS"`
SqlThreadT uint64 `toml:"sql-thread-t" json:"sqlThreadT"`
SqlHintT string `toml:"sql-hint-t" json:"sqlHintT"`
Expand Down
1 change: 1 addition & 0 deletions component/cli/migrate/statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type StatementConfig struct {

type StatementMigrateParam struct {
TableThread uint64 `toml:"table-thread" json:"tableThread"`
WriteThread uint64 `toml:"write-thread" json:"writeThread"`
BatchSize uint64 `toml:"batch-size" json:"batchSize"`
ChunkSize uint64 `toml:"chunk-size" json:"chunkSize"`
SqlThreadS uint64 `toml:"sql-thread-s" json:"sqlThreadS"`
Expand Down
76 changes: 34 additions & 42 deletions database/oracle/taskflow/csv_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,18 @@ func (cmt *CsvMigrateTask) Start() error {

logger.Info("csv migrate task inspect migrate task",
zap.String("task_name", cmt.Task.TaskName), zap.String("task_mode", cmt.Task.TaskMode), zap.String("task_flow", cmt.Task.TaskFlow))
dbVersion, _, err := processor.InspectOracleMigrateTask(cmt.Task.TaskName, cmt.Task.TaskFlow, cmt.Task.TaskMode, databaseS, stringutil.StringUpper(cmt.DatasourceS.ConnectCharset), stringutil.StringUpper(cmt.DatasourceT.ConnectCharset))
_, err = processor.InspectOracleMigrateTask(cmt.Task.TaskName, cmt.Task.TaskFlow, cmt.Task.TaskMode, databaseS, stringutil.StringUpper(cmt.DatasourceS.ConnectCharset), stringutil.StringUpper(cmt.DatasourceT.ConnectCharset))
if err != nil {
return err
}

logger.Info("csv migrate task init task",
zap.String("task_name", cmt.Task.TaskName), zap.String("task_mode", cmt.Task.TaskMode), zap.String("task_flow", cmt.Task.TaskFlow))
err = cmt.InitCsvMigrateTask(databaseS, dbVersion, schemaRoute)
dbVersionS, err := databaseS.GetDatabaseVersion()
if err != nil {
return err
}
err = cmt.InitCsvMigrateTask(databaseS, dbVersionS, schemaRoute)
if err != nil {
return err
}
Expand Down Expand Up @@ -876,26 +880,20 @@ func (cmt *CsvMigrateTask) InitCsvMigrateTask(databaseS database.IDatabase, dbVe
})
}

err = model.Transaction(gCtx, func(txnCtx context.Context) error {
err = model.GetIDataMigrateTaskRW().CreateInBatchDataMigrateTask(txnCtx, metas, int(cmt.TaskParams.BatchSize))
if err != nil {
return err
}
_, err = model.GetIDataMigrateSummaryRW().CreateDataMigrateSummary(txnCtx, &task.DataMigrateSummary{
TaskName: cmt.Task.TaskName,
SchemaNameS: attsRule.SchemaNameS,
TableNameS: attsRule.TableNameS,
SchemaNameT: attsRule.SchemaNameT,
TableNameT: attsRule.TableNameT,
SnapshotPointS: globalScn,
TableRowsS: tableRows,
TableSizeS: tableSize,
ChunkTotals: uint64(len(upstreamBuckets)),
})
if err != nil {
return err
}
return nil
err = model.GetIDataMigrateTaskRW().CreateInBatchDataMigrateTask(gCtx, metas, int(cmt.TaskParams.WriteThread), int(cmt.TaskParams.BatchSize))
if err != nil {
return err
}
_, err = model.GetIDataMigrateSummaryRW().CreateDataMigrateSummary(gCtx, &task.DataMigrateSummary{
TaskName: cmt.Task.TaskName,
SchemaNameS: attsRule.SchemaNameS,
TableNameS: attsRule.TableNameS,
SchemaNameT: attsRule.SchemaNameT,
TableNameT: attsRule.TableNameT,
SnapshotPointS: globalScn,
TableRowsS: tableRows,
TableSizeS: tableSize,
ChunkTotals: uint64(len(upstreamBuckets)),
})
if err != nil {
return err
Expand Down Expand Up @@ -1010,26 +1008,20 @@ func (cmt *CsvMigrateTask) InitCsvMigrateTask(databaseS database.IDatabase, dbVe
})
}

err = model.Transaction(gCtx, func(txnCtx context.Context) error {
err = model.GetIDataMigrateTaskRW().CreateInBatchDataMigrateTask(txnCtx, metas, int(cmt.TaskParams.BatchSize))
if err != nil {
return err
}
_, err = model.GetIDataMigrateSummaryRW().CreateDataMigrateSummary(txnCtx, &task.DataMigrateSummary{
TaskName: cmt.Task.TaskName,
SchemaNameS: attsRule.SchemaNameS,
TableNameS: attsRule.TableNameS,
SchemaNameT: attsRule.SchemaNameT,
TableNameT: attsRule.TableNameT,
SnapshotPointS: globalScn,
TableRowsS: tableRows,
TableSizeS: tableSize,
ChunkTotals: uint64(len(chunks)),
})
if err != nil {
return err
}
return nil
err = model.GetIDataMigrateTaskRW().CreateInBatchDataMigrateTask(gCtx, metas, int(cmt.TaskParams.WriteThread), int(cmt.TaskParams.BatchSize))
if err != nil {
return err
}
_, err = model.GetIDataMigrateSummaryRW().CreateDataMigrateSummary(gCtx, &task.DataMigrateSummary{
TaskName: cmt.Task.TaskName,
SchemaNameS: attsRule.SchemaNameS,
TableNameS: attsRule.TableNameS,
SchemaNameT: attsRule.SchemaNameT,
TableNameT: attsRule.TableNameT,
SnapshotPointS: globalScn,
TableRowsS: tableRows,
TableSizeS: tableSize,
ChunkTotals: uint64(len(chunks)),
})
if err != nil {
return err
Expand Down
40 changes: 19 additions & 21 deletions database/oracle/taskflow/data_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,18 @@ func (dst *DataScanTask) Start() error {
logger.Info("data scan task inspect migrate task",
zap.String("task_name", dst.Task.TaskName), zap.String("task_mode", dst.Task.TaskMode), zap.String("task_flow", dst.Task.TaskFlow))

dbVersion, _, err := processor.InspectOracleMigrateTask(dst.Task.TaskName, dst.Task.TaskFlow, dst.Task.TaskMode, databaseS, stringutil.StringUpper(dst.DatasourceS.ConnectCharset), stringutil.StringUpper(dst.DatasourceT.ConnectCharset))
_, err = processor.InspectOracleMigrateTask(dst.Task.TaskName, dst.Task.TaskFlow, dst.Task.TaskMode, databaseS, stringutil.StringUpper(dst.DatasourceS.ConnectCharset), stringutil.StringUpper(dst.DatasourceT.ConnectCharset))
if err != nil {
return err
}

logger.Info("data scan task init task",
zap.String("task_name", dst.Task.TaskName), zap.String("task_mode", dst.Task.TaskMode), zap.String("task_flow", dst.Task.TaskFlow))
err = dst.initDataScanTask(databaseS, dbVersion, schemaNameS)
dbVersionS, err := databaseS.GetDatabaseVersion()
if err != nil {
return err
}
err = dst.initDataScanTask(databaseS, dbVersionS, schemaNameS)
if err != nil {
return err
}
Expand Down Expand Up @@ -778,24 +782,18 @@ func (dst *DataScanTask) initDataScanTask(databaseS database.IDatabase, dbVersio
})
}

err = model.Transaction(gCtx, func(txnCtx context.Context) error {
err = model.GetIDataScanTaskRW().CreateInBatchDataScanTask(txnCtx, metas, int(dst.TaskParams.BatchSize))
if err != nil {
return err
}
_, err = model.GetIDataScanSummaryRW().CreateDataScanSummary(txnCtx, &task.DataScanSummary{
TaskName: dst.Task.TaskName,
SchemaNameS: attsRule.SchemaNameS,
TableNameS: attsRule.TableNameS,
SnapshotPointS: globalScn,
TableRowsS: tableRows,
TableSizeS: tableSize,
ChunkTotals: uint64(len(upstreamBuckets)),
})
if err != nil {
return err
}
return nil
err = model.GetIDataScanTaskRW().CreateInBatchDataScanTask(gCtx, metas, int(dst.TaskParams.WriteThread), int(dst.TaskParams.BatchSize))
if err != nil {
return err
}
_, err = model.GetIDataScanSummaryRW().CreateDataScanSummary(gCtx, &task.DataScanSummary{
TaskName: dst.Task.TaskName,
SchemaNameS: attsRule.SchemaNameS,
TableNameS: attsRule.TableNameS,
SnapshotPointS: globalScn,
TableRowsS: tableRows,
TableSizeS: tableSize,
ChunkTotals: uint64(len(upstreamBuckets)),
})
if err != nil {
return err
Expand Down Expand Up @@ -890,7 +888,7 @@ func (dst *DataScanTask) initDataScanTask(databaseS database.IDatabase, dbVersio
}

err = model.Transaction(gCtx, func(txnCtx context.Context) error {
err = model.GetIDataScanTaskRW().CreateInBatchDataScanTask(txnCtx, metas, int(dst.TaskParams.BatchSize))
err = model.GetIDataScanTaskRW().CreateInBatchDataScanTask(txnCtx, metas, int(dst.TaskParams.WriteThread), int(dst.TaskParams.BatchSize))
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions database/oracle/taskflow/sql_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (smt *SqlMigrateTask) Start() error {
zap.String("task_name", smt.Task.TaskName),
zap.String("task_mode", smt.Task.TaskMode),
zap.String("task_flow", smt.Task.TaskFlow))
_, _, err = processor.InspectOracleMigrateTask(smt.Task.TaskName, smt.Task.TaskFlow, smt.Task.TaskMode, databaseS, stringutil.StringUpper(smt.DatasourceS.ConnectCharset), stringutil.StringUpper(smt.DatasourceT.ConnectCharset))
_, err = processor.InspectOracleMigrateTask(smt.Task.TaskName, smt.Task.TaskFlow, smt.Task.TaskMode, databaseS, stringutil.StringUpper(smt.DatasourceS.ConnectCharset), stringutil.StringUpper(smt.DatasourceT.ConnectCharset))
if err != nil {
return err
}
Expand Down Expand Up @@ -401,7 +401,7 @@ func (smt *SqlMigrateTask) InitSqlMigrateTask(databaseS database.IDatabase) erro
if err != nil {
return err
}
err = model.GetISqlMigrateTaskRW().CreateInBatchSqlMigrateTask(txnCtx, sqlMigrateTasks, constant.DefaultRecordCreateBatchSize)
err = model.GetISqlMigrateTaskRW().CreateInBatchSqlMigrateTask(txnCtx, sqlMigrateTasks, int(smt.TaskParams.WriteThread), int(smt.TaskParams.BatchSize))
if err != nil {
return err
}
Expand Down
76 changes: 34 additions & 42 deletions database/oracle/taskflow/stmt_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,18 @@ func (stm *StmtMigrateTask) Start() error {
logger.Info("stmt migrate task inspect migrate task",
zap.String("task_name", stm.Task.TaskName), zap.String("task_mode", stm.Task.TaskMode), zap.String("task_flow", stm.Task.TaskFlow))

dbVersion, _, err := processor.InspectOracleMigrateTask(stm.Task.TaskName, stm.Task.TaskFlow, stm.Task.TaskMode, databaseS, stringutil.StringUpper(stm.DatasourceS.ConnectCharset), stringutil.StringUpper(stm.DatasourceT.ConnectCharset))
_, err = processor.InspectOracleMigrateTask(stm.Task.TaskName, stm.Task.TaskFlow, stm.Task.TaskMode, databaseS, stringutil.StringUpper(stm.DatasourceS.ConnectCharset), stringutil.StringUpper(stm.DatasourceT.ConnectCharset))
if err != nil {
return err
}

logger.Info("stmt migrate task init task",
zap.String("task_name", stm.Task.TaskName), zap.String("task_mode", stm.Task.TaskMode), zap.String("task_flow", stm.Task.TaskFlow))
err = stm.initStmtMigrateTask(databaseS, dbVersion, schemaRoute)
dbVersionS, err := databaseS.GetDatabaseVersion()
if err != nil {
return err
}
err = stm.initStmtMigrateTask(databaseS, dbVersionS, schemaRoute)
if err != nil {
return err
}
Expand Down Expand Up @@ -853,26 +857,20 @@ func (stm *StmtMigrateTask) initStmtMigrateTask(databaseS database.IDatabase, db
})
}

err = model.Transaction(gCtx, func(txnCtx context.Context) error {
err = model.GetIDataMigrateTaskRW().CreateInBatchDataMigrateTask(txnCtx, metas, int(stm.TaskParams.BatchSize))
if err != nil {
return err
}
_, err = model.GetIDataMigrateSummaryRW().CreateDataMigrateSummary(txnCtx, &task.DataMigrateSummary{
TaskName: stm.Task.TaskName,
SchemaNameS: attsRule.SchemaNameS,
TableNameS: attsRule.TableNameS,
SchemaNameT: attsRule.SchemaNameT,
TableNameT: attsRule.TableNameT,
SnapshotPointS: globalScn,
TableRowsS: tableRows,
TableSizeS: tableSize,
ChunkTotals: uint64(len(upstreamBuckets)),
})
if err != nil {
return err
}
return nil
err = model.GetIDataMigrateTaskRW().CreateInBatchDataMigrateTask(gCtx, metas, int(stm.TaskParams.WriteThread), int(stm.TaskParams.BatchSize))
if err != nil {
return err
}
_, err = model.GetIDataMigrateSummaryRW().CreateDataMigrateSummary(gCtx, &task.DataMigrateSummary{
TaskName: stm.Task.TaskName,
SchemaNameS: attsRule.SchemaNameS,
TableNameS: attsRule.TableNameS,
SchemaNameT: attsRule.SchemaNameT,
TableNameT: attsRule.TableNameT,
SnapshotPointS: globalScn,
TableRowsS: tableRows,
TableSizeS: tableSize,
ChunkTotals: uint64(len(upstreamBuckets)),
})
if err != nil {
return err
Expand Down Expand Up @@ -985,26 +983,20 @@ func (stm *StmtMigrateTask) initStmtMigrateTask(databaseS database.IDatabase, db
})
}

err = model.Transaction(gCtx, func(txnCtx context.Context) error {
err = model.GetIDataMigrateTaskRW().CreateInBatchDataMigrateTask(txnCtx, metas, int(stm.TaskParams.BatchSize))
if err != nil {
return err
}
_, err = model.GetIDataMigrateSummaryRW().CreateDataMigrateSummary(txnCtx, &task.DataMigrateSummary{
TaskName: stm.Task.TaskName,
SchemaNameS: attsRule.SchemaNameS,
TableNameS: attsRule.TableNameS,
SchemaNameT: attsRule.SchemaNameT,
TableNameT: attsRule.TableNameT,
SnapshotPointS: globalScn,
TableRowsS: tableRows,
TableSizeS: tableSize,
ChunkTotals: uint64(len(chunks)),
})
if err != nil {
return err
}
return nil
err = model.GetIDataMigrateTaskRW().CreateInBatchDataMigrateTask(gCtx, metas, int(stm.TaskParams.WriteThread), int(stm.TaskParams.BatchSize))
if err != nil {
return err
}
_, err = model.GetIDataMigrateSummaryRW().CreateDataMigrateSummary(gCtx, &task.DataMigrateSummary{
TaskName: stm.Task.TaskName,
SchemaNameS: attsRule.SchemaNameS,
TableNameS: attsRule.TableNameS,
SchemaNameT: attsRule.SchemaNameT,
TableNameT: attsRule.TableNameT,
SnapshotPointS: globalScn,
TableRowsS: tableRows,
TableSizeS: tableSize,
ChunkTotals: uint64(len(chunks)),
})
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion database/oracle/taskflow/struct_compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (dmt *StructCompareTask) Start() error {

logger.Info("struct compare task inspect migrate task",
zap.String("task_name", dmt.Task.TaskName), zap.String("task_mode", dmt.Task.TaskMode), zap.String("task_flow", dmt.Task.TaskFlow))
_, _, err = processor.InspectOracleMigrateTask(dmt.Task.TaskName, dmt.Task.TaskFlow, dmt.Task.TaskMode, databaseS, stringutil.StringUpper(dmt.DatasourceS.ConnectCharset), stringutil.StringUpper(dmt.DatasourceT.ConnectCharset))
_, err = processor.InspectOracleMigrateTask(dmt.Task.TaskName, dmt.Task.TaskFlow, dmt.Task.TaskMode, databaseS, stringutil.StringUpper(dmt.DatasourceS.ConnectCharset), stringutil.StringUpper(dmt.DatasourceT.ConnectCharset))
if err != nil {
return err
}
Expand Down
27 changes: 23 additions & 4 deletions database/postgresql/struct_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -910,13 +910,32 @@ func (d *Database) GetDatabaseTableColumnNameSqlDimensions(sqlStr string) ([]str
}

func (d *Database) GetDatabaseTableRows(schemaName, tableName string) (uint64, error) {
//TODO implement me
panic("implement me")
_, res, err := d.GeneralQuery(fmt.Sprintf(`SELECT
reltuples AS row_counts
FROM pg_class
WHERE relkind = 'r'
AND relnamespace = (SELECT oid from pg_namespace where nspname = '%s')
AND relname = '%s'`, schemaName, tableName))
if err != nil {
return 0, err
}
size, err := stringutil.StrconvUintBitSize(res[0]["row_counts"], 64)
if err != nil {
return 0, err
}
return size, nil
}

func (d *Database) GetDatabaseTableSize(schemaName, tableName string) (float64, error) {
//TODO implement me
panic("implement me")
_, res, err := d.GeneralQuery(fmt.Sprintf(`SELECT pg_total_relation_size('%s.%s')/1024/1024 AS mb`, schemaName, tableName))
if err != nil {
return 0, err
}
size, err := stringutil.StrconvFloatBitSize(res[0]["mb"], 64)
if err != nil {
return 0, err
}
return size, nil
}

func (d *Database) GetDatabaseTableChunkTask(taskName, schemaName, tableName string, chunkSize uint64, callTimeout uint64) ([]map[string]string, error) {
Expand Down
Loading

0 comments on commit 1e82a7d

Please sign in to comment.