Skip to content

Commit

Permalink
feat: opti perf and add fix force, close #29, close #45
Browse files Browse the repository at this point in the history
  • Loading branch information
wentaojin committed Aug 20, 2024
1 parent b63a396 commit ca92e12
Show file tree
Hide file tree
Showing 55 changed files with 6,138 additions and 5,679 deletions.
4 changes: 3 additions & 1 deletion component/cli/command/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ type AppVerifyGen struct {
*AppVerify
task string
outputDir string
force bool
}

func (a *AppVerify) AppVerifyGen() component.Cmder {
Expand All @@ -198,6 +199,7 @@ func (a *AppVerifyGen) Cmd() *cobra.Command {
}
cmd.Flags().StringVarP(&a.task, "task", "t", "", "the data compare task")
cmd.Flags().StringVarP(&a.outputDir, "outputDir", "o", "/tmp", "the data compare task output file dir")
cmd.Flags().BoolVarP(&a.force, "force", "f", false, "the data compare task force ignore the task status success check, output fixed file")
return cmd
}

Expand Down Expand Up @@ -235,7 +237,7 @@ func (a *AppVerifyGen) RunE(cmd *cobra.Command, args []string) error {
}
}

err := service.GenDataCompareTask(context.Background(), a.Server, a.task, a.outputDir)
err := service.GenDataCompareTask(context.Background(), a.Server, a.task, a.outputDir, a.force)
if err != nil {
if errors.Is(err, errors.New(constant.TaskDatabaseStatusEqual)) {
fmt.Printf("Status: %s\n", cyan.Sprint("success"))
Expand Down
1 change: 1 addition & 0 deletions component/cli/migrate/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type SqlMigrateParam struct {
SqlThreadT uint64 `toml:"sql-thread-t" json:"sqlThreadT"`
SqlHintT string `toml:"sql-hint-t" json:"sqlHintT"`
CallTimeout uint64 `toml:"call-timeout" json:"callTimeout"`
EnableCheckpoint bool `toml:"enable-checkpoint" json:"enableCheckpoint"`
EnableConsistentRead bool `toml:"enable-consistent-read" json:"enableConsistentRead"`
EnableSafeMode bool `toml:"enable-safe-mode" json:"enableSafeMode"`
}
Expand Down
2 changes: 1 addition & 1 deletion database/data_compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type IDatabaseDataCompare interface {
GetDatabaseTableStatisticsBucket(schemeNameS, tableNameS string, consColumns map[string]string) (map[string][]structure.Bucket, error)
GetDatabaseTableStatisticsHistogram(schemeNameS, tableNameS string, consColumns map[string]string) (map[string]structure.Histogram, error)
GetDatabaseTableColumnProperties(schemaNameS, tableNameS string, columnNameSli []string) ([]map[string]string, error)
GetDatabaseTableHighestSelectivityIndex(schemaNameS, tableNameS string, compareCondField string, ignoreCondFields []string) (*structure.HighestBucket, error)
GetDatabaseTableHighestSelectivityIndex(schemaNameS, tableNameS string, compareCondField string, ignoreCondFields []string) (*structure.Selectivity, error)
GetDatabaseTableRandomValues(schemaNameS, tableNameS string, columns []string, conditions string, condArgs []interface{}, limit int, collations []string) ([][]string, error)
GetDatabaseTableCompareData(querySQL string, callTimeout int, dbCharsetS, dbCharsetT string, queryArgs []interface{}) ([]string, uint32, map[string]int64, error)
}
Expand Down
2 changes: 1 addition & 1 deletion database/data_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type IDatabaseDataMigrate interface {
GetDatabaseTableColumnNameSqlDimensions(sqlStr string) ([]string, map[string]string, map[string]string, error)
GetDatabaseTableRows(schemaName, tableName string) (uint64, error)
GetDatabaseTableSize(schemaName, tableName string) (float64, error)
GetDatabaseTableChunkTask(taskName, schemaName, tableName string, chunkSize uint64, callTimeout uint64) ([]map[string]string, error)
GetDatabaseTableChunkTask(taskName, schemaName, tableName string, chunkSize uint64, callTimeout uint64, batchSize int, dataChan chan []map[string]string) error
GetDatabaseTableChunkData(querySQL string, queryArgs []interface{}, batchSize, callTimeout int, dbCharsetS, dbCharsetT, columnDetailO string, dataChan chan []interface{}) error
GetDatabaseTableCsvData(querySQL string, queryArgs []interface{}, callTimeout int, taskFlow, dbCharsetS, dbCharsetT, columnDetailO string, escapeBackslash bool, nullValue, separator, delimiter string, dataChan chan []string) error
}
Expand Down
40 changes: 40 additions & 0 deletions database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"database/sql"
"github.com/wentaojin/dbms/database/postgresql"
"github.com/wentaojin/dbms/utils/structure"
"golang.org/x/sync/errgroup"
"strings"

"github.com/wentaojin/dbms/database/mysql"
Expand Down Expand Up @@ -57,6 +58,45 @@ type IDatabaseSchemaTableRule interface {
GenSchemaTableColumnSelectRule() (string, string, string, string, error)
}

// IDatabaseRunner used for database table migrate runner
type IDatabaseRunner interface {
Init() error
Run() error
Resume() error
}

func IDatabaseRun(ctx context.Context, i IDatabaseRunner) error {
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
err := i.Init()
if err != nil {
return err
}
return nil
})

g.Go(func() error {
err := i.Run()
if err != nil {
return err
}
return nil
})

g.Go(func() error {
err := i.Resume()
if err != nil {
return err
}
return nil
})

if err := g.Wait(); err != nil {
return err
}
return nil
}

func NewDatabase(ctx context.Context, datasource *datasource.Datasource, migrateOracleSchema string, callTimeout int64) (IDatabase, error) {
var (
database IDatabase
Expand Down
16 changes: 8 additions & 8 deletions database/mysql/data_compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (d *Database) GetDatabaseTableStatisticsHistogram(schemaNameS, tableNameS s
return nil, fmt.Errorf("the database table statistics histograms doesn't supported, only support tidb database, version: [%v]", res[0]["VERSION"])
}

func (d *Database) GetDatabaseTableHighestSelectivityIndex(schemaNameS, tableNameS string, compareCondField string, ignoreCondFields []string) (*structure.HighestBucket, error) {
func (d *Database) GetDatabaseTableHighestSelectivityIndex(schemaNameS, tableNameS string, compareCondField string, ignoreCondFields []string) (*structure.Selectivity, error) {
consColumns, err := d.GetDatabaseTableConstraintIndexColumn(schemaNameS, tableNameS)
if err != nil {
return nil, err
Expand Down Expand Up @@ -259,12 +259,12 @@ func (d *Database) GetDatabaseTableHighestSelectivityIndex(schemaNameS, tableNam
return nil, nil
}

highestBucket, err := structure.FindMatchDistinctCountBucket(sortHists, buckets, consColumns)
Selectivity, err := structure.FindMatchDistinctCountBucket(sortHists, buckets, consColumns)
if err != nil {
return nil, err
}

properties, err := d.GetDatabaseTableColumnProperties(schemaNameS, tableNameS, highestBucket.IndexColumn)
properties, err := d.GetDatabaseTableColumnProperties(schemaNameS, tableNameS, Selectivity.IndexColumn)
if err != nil {
return nil, err
}
Expand All @@ -273,7 +273,7 @@ func (d *Database) GetDatabaseTableHighestSelectivityIndex(schemaNameS, tableNam
columnCollations []string
datetimePrecision []string
)
for _, c := range highestBucket.IndexColumn {
for _, c := range Selectivity.IndexColumn {
for _, p := range properties {
if strings.EqualFold(p["COLUMN_NAME"], c) {
columnProps = append(columnProps, p["DATA_TYPE"])
Expand All @@ -294,10 +294,10 @@ func (d *Database) GetDatabaseTableHighestSelectivityIndex(schemaNameS, tableNam
}
}

highestBucket.ColumnDatatype = columnProps
highestBucket.ColumnCollation = columnCollations
highestBucket.DatetimePrecision = datetimePrecision
return highestBucket, nil
Selectivity.ColumnDatatype = columnProps
Selectivity.ColumnCollation = columnCollations
Selectivity.DatetimePrecision = datetimePrecision
return Selectivity, nil
}

func (d *Database) GetDatabaseTableRandomValues(schemaNameS, tableNameS string, columns []string, conditions string, condArgs []interface{}, limit int, collations []string) ([][]string, error) {
Expand Down
2 changes: 1 addition & 1 deletion database/mysql/data_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ WHERE
return sizeMB, nil
}

func (d *Database) GetDatabaseTableChunkTask(taskName, schemaName, tableName string, chunkSize uint64, callTimeout uint64) ([]map[string]string, error) {
func (d *Database) GetDatabaseTableChunkTask(taskName, schemaName, tableName string, chunkSize uint64, callTimeout uint64, batchSize int, dataChan chan []map[string]string) error {
//TODO implement me
panic("implement me")
}
Expand Down
16 changes: 8 additions & 8 deletions database/oracle/data_compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ WHERE TABLE_OWNER = '%s'
return hist, nil
}

func (d *Database) GetDatabaseTableHighestSelectivityIndex(schemaNameS, tableNameS string, compareCondField string, ignoreCondFields []string) (*structure.HighestBucket, error) {
func (d *Database) GetDatabaseTableHighestSelectivityIndex(schemaNameS, tableNameS string, compareCondField string, ignoreCondFields []string) (*structure.Selectivity, error) {
consColumns, err := d.GetDatabaseTableConstraintIndexColumn(schemaNameS, tableNameS)
if err != nil {
return nil, err
Expand Down Expand Up @@ -225,12 +225,12 @@ func (d *Database) GetDatabaseTableHighestSelectivityIndex(schemaNameS, tableNam
return nil, nil
}

highestBucket, err := structure.FindMatchDistinctCountBucket(sortHists, buckets, consColumns)
Selectivity, err := structure.FindMatchDistinctCountBucket(sortHists, buckets, consColumns)
if err != nil {
return nil, err
}

properties, err := d.GetDatabaseTableColumnProperties(schemaNameS, tableNameS, highestBucket.IndexColumn)
properties, err := d.GetDatabaseTableColumnProperties(schemaNameS, tableNameS, Selectivity.IndexColumn)
if err != nil {
return nil, err
}
Expand All @@ -239,7 +239,7 @@ func (d *Database) GetDatabaseTableHighestSelectivityIndex(schemaNameS, tableNam
columnCollations []string
datetimePrecision []string
)
for _, c := range highestBucket.IndexColumn {
for _, c := range Selectivity.IndexColumn {
for _, p := range properties {
if strings.EqualFold(p["COLUMN_NAME"], c) {
columnProps = append(columnProps, p["DATA_TYPE"])
Expand All @@ -261,10 +261,10 @@ func (d *Database) GetDatabaseTableHighestSelectivityIndex(schemaNameS, tableNam
}
}

highestBucket.ColumnDatatype = columnProps
highestBucket.ColumnCollation = columnCollations
highestBucket.DatetimePrecision = datetimePrecision
return highestBucket, nil
Selectivity.ColumnDatatype = columnProps
Selectivity.ColumnCollation = columnCollations
Selectivity.DatetimePrecision = datetimePrecision
return Selectivity, nil
}

func (d *Database) GetDatabaseTableRandomValues(schemaNameS, tableNameS string, columns []string, conditions string, condArgs []interface{}, limit int, collations []string) ([][]string, error) {
Expand Down
86 changes: 78 additions & 8 deletions database/oracle/data_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,13 @@ func (d *Database) GetDatabaseDirectoryName(directory string) (string, error) {
return res[0]["DIRECTORY_PATH"], nil
}

func (d *Database) GetDatabaseTableChunkTask(taskName, schemaName, tableName string, chunkSize uint64, callTimeout uint64) ([]map[string]string, error) {
func (d *Database) GetDatabaseTableChunkTask(taskName, schemaName, tableName string, chunkSize uint64, callTimeout uint64, batchSize int, dataChan chan []map[string]string) error {
sqlStr00 := fmt.Sprintf(`BEGIN
DBMS_PARALLEL_EXECUTE.CREATE_TASK (TASK_NAME => '%s');
END;`, taskName)
_, err := d.ExecContext(d.Ctx, sqlStr00)
if err != nil {
return nil, fmt.Errorf("oracle DBMS_PARALLEL_EXECUTE create task failed: %v, sql: %v", err, sqlStr00)
return fmt.Errorf("oracle DBMS_PARALLEL_EXECUTE create task failed: %v, sql: %v", err, sqlStr00)
}

deadline := time.Now().Add(time.Duration(callTimeout) * time.Second)
Expand All @@ -181,21 +181,91 @@ END;`, taskName)
if err != nil {
_, err = d.ExecContext(d.Ctx, sqlStr02)
if err != nil {
return nil, fmt.Errorf("oracle DBMS_PARALLEL_EXECUTE create_chunks_by_rowid drop task failed: %v, sql: %v", err, sqlStr02)
return fmt.Errorf("oracle DBMS_PARALLEL_EXECUTE create_chunks_by_rowid drop task failed: %v, sql: %v", err, sqlStr02)
}
return nil, fmt.Errorf("oracle DBMS_PARALLEL_EXECUTE create_chunks_by_rowid task failed: %v, sql: %v", err, sqlStr01)
return fmt.Errorf("oracle DBMS_PARALLEL_EXECUTE create_chunks_by_rowid task failed: %v, sql: %v", err, sqlStr01)
}

sqlStr03 := fmt.Sprintf(`SELECT 'ROWID BETWEEN ''' || START_ROWID || ''' AND ''' || END_ROWID || '''' CMD FROM DBA_PARALLEL_EXECUTE_CHUNKS WHERE TASK_NAME = '%s' ORDER BY CHUNK_ID`, taskName)
_, res, err := d.GeneralQuery(sqlStr03)

batchRowsData := make([]map[string]string, 0, batchSize)

qdeadline := time.Now().Add(time.Duration(d.CallTimeout) * time.Second)

qctx, qcancel := context.WithDeadline(d.Ctx, qdeadline)
defer qcancel()

rows, err := d.QueryContext(qctx, sqlStr03)
if err != nil {
_, err = d.ExecContext(d.Ctx, sqlStr02)
if err != nil {
return fmt.Errorf("oracle DBMS_PARALLEL_EXECUTE query_chunks_rowid drop task failed: %v, sql: %v", err, sqlStr03)
}
return err
}
defer rows.Close()

// general query, automatic get column name
columns, err := rows.Columns()
if err != nil {
_, err = d.ExecContext(d.Ctx, sqlStr02)
if err != nil {
return nil, fmt.Errorf("oracle DBMS_PARALLEL_EXECUTE query_chunks_rowid drop task failed: %v, sql: %v", err, sqlStr02)
return fmt.Errorf("oracle DBMS_PARALLEL_EXECUTE query_chunks_rowid drop task failed: %v, sql: %v", err, sqlStr03)
}
return nil, fmt.Errorf("oracle DBMS_PARALLEL_EXECUTE query_chunks_rowid task failed: %v, sql: %v", err, sqlStr03)
return fmt.Errorf("query rows.Columns failed, sql: [%v], error: [%v]", sqlStr03, err)
}

values := make([][]byte, len(columns))
scans := make([]interface{}, len(columns))
for i := range values {
scans[i] = &values[i]
}
return res, nil

for rows.Next() {
err = rows.Scan(scans...)
if err != nil {
return fmt.Errorf("query rows.Scan failed, sql: [%v], error: [%v]", sqlStr03, err)
}

row := make(map[string]string)
for k, v := range values {
// Notes: oracle database NULL and ""
// 1, if the return value is NULLABLE, it represents the value is NULL, oracle sql query statement had be required the field NULL judgement, and if the filed is NULL, it returns that the value is NULLABLE
// 2, if the return value is nil, it represents the value is NULL
// 3, if the return value is "", it represents the value is "" string
// 4, if the return value is 'NULL' or 'null', it represents the value is NULL or null string
if v == nil {
row[columns[k]] = "NULLABLE"
} else {
// Handling empty string and other values, the return value output string
row[columns[k]] = stringutil.BytesToString(v)
}
}

// temporary array
batchRowsData = append(batchRowsData, row)

// batch
if len(batchRowsData) == batchSize {
dataChan <- batchRowsData
// clear
batchRowsData = make([]map[string]string, 0, batchSize)
}
}

if err = rows.Err(); err != nil {
_, err = d.ExecContext(d.Ctx, sqlStr02)
if err != nil {
return fmt.Errorf("oracle DBMS_PARALLEL_EXECUTE query_chunks_rowid drop task failed: %v, sql: %v", err, sqlStr03)
}
return fmt.Errorf("query rows.Next failed, sql: [%v], error: [%v]", sqlStr03, err)
}

// non-batch batch
if len(batchRowsData) > 0 {
dataChan <- batchRowsData
}
return nil
}

func (d *Database) GetDatabaseTableChunkData(querySQL string, queryArgs []interface{}, batchSize, callTimeout int, dbCharsetS, dbCharsetT, columnDetailO string, dataChan chan []interface{}) error {
Expand Down
Loading

0 comments on commit ca92e12

Please sign in to comment.