Skip to content

Commit

Permalink
Introduce DoStep
Browse files Browse the repository at this point in the history
  • Loading branch information
cristaloleg committed Jun 30, 2022
1 parent 176e818 commit 03a4023
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 271 deletions.
113 changes: 30 additions & 83 deletions dbump.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,28 +51,24 @@ type Migrator interface {
LockDB(ctx context.Context) error
// UnlockDB to allow running other migrators later.
UnlockDB(ctx context.Context) error

// Init the dbump database where database state is saved.
// What is created by this method completely depends on migrator implementation
// and might be different between databases.
Init(ctx context.Context) error

// Version of the migration. Used only once in the beginning.
Version(ctx context.Context) (version int, err error)
// SetVersion is run after each migration.
SetVersion(ctx context.Context, version int) error

// Begin the transaction before migration.
// Might be no-op if DisableTx is set or transaction are not supported by database.
Begin(ctx context.Context) error
// Commit the transaction after migration.
// Might be no-op if DisableTx is set or transaction are not supported by database.
Commit(ctx context.Context) error
// Rollback the transaction on migration fail.
// Might be no-op if DisableTx is set or transaction are not supported by database.
Rollback(ctx context.Context) error

// Exec the given query and params.
Exec(ctx context.Context, query string, args ...interface{}) error

// DoStep runs the given query and sets a new version on success.
DoStep(ctx context.Context, step Step) error
}

// Step represents exact thing that is going to run.
type Step struct {
Version int
Query string
DisableTx bool
}

// Loader returns migrations to be applied on a database.
Expand Down Expand Up @@ -197,50 +193,13 @@ func (m *mig) runMigrationsLocked(ctx context.Context, ms []*Migration) error {
}

for _, step := range m.prepareSteps(curr, target, ms) {
if err := m.execStep(ctx, step); err != nil {
if err := m.DoStep(ctx, step); err != nil {
return err
}
}
return nil
}

func (m *mig) execStep(ctx context.Context, step step) error {
if m.Config.DisableTx {
return m.execSimpleStep(ctx, step)
}
return m.execStepSafely(ctx, step)
}

func (m *mig) execStepSafely(ctx context.Context, step step) (err error) {
if err := m.Begin(ctx); err != nil {
return fmt.Errorf("begin tx: %w", err)
}

defer func() {
if err != nil {
if errRollback := m.Rollback(ctx); errRollback != nil {
err = fmt.Errorf("(rollback tx: %v): %w", errRollback, err)
}
}
}()

err = m.execSimpleStep(ctx, step)
if err == nil {
err = m.Commit(ctx)
}
return err
}

func (m *mig) execSimpleStep(ctx context.Context, step step) error {
if err := m.Exec(ctx, step.Query); err != nil {
return fmt.Errorf("exec: %w", err)
}
if err := m.SetVersion(ctx, step.Version); err != nil {
return fmt.Errorf("set version: %w", err)
}
return nil
}

func (m *mig) getCurrAndTargetVersions(ctx context.Context, migrations int) (curr, target int, err error) {
curr, err = m.Version(ctx)
if err != nil {
Expand Down Expand Up @@ -278,11 +237,11 @@ func (m *mig) getCurrAndTargetVersions(ctx context.Context, migrations int) (cur
return curr, target, nil
}

func (m *mig) prepareSteps(curr, target int, ms []*Migration) []step {
func (m *mig) prepareSteps(curr, target int, ms []*Migration) []Step {
if curr == target {
return nil
}
steps := []step{}
steps := []Step{}

direction := 1
if curr > target {
Expand All @@ -296,56 +255,44 @@ func (m *mig) prepareSteps(curr, target int, ms []*Migration) []step {
idx--
}

steps = append(steps, ms[idx].toStep(isUp))
steps = append(steps, ms[idx].toStep(isUp, m.DisableTx))
if m.ZigZag {
steps = append(steps,
ms[idx].toStep(!isUp),
ms[idx].toStep(isUp))
ms[idx].toStep(!isUp, m.DisableTx),
ms[idx].toStep(isUp, m.DisableTx))
}
}
return steps
}

type step struct {
Version int
IsQuery bool
Query string
}

func (m *Migration) toStep(up bool) step {
func (m *Migration) toStep(up, disableTx bool) Step {
if up {
return step{
Version: m.ID,
IsQuery: m.Apply != "",
Query: m.Apply,
return Step{
Version: m.ID,
Query: m.Apply,
DisableTx: disableTx,
}
}
return step{
Version: m.ID - 1,
IsQuery: m.Revert != "",
Query: m.Revert,
return Step{
Version: m.ID - 1,
Query: m.Revert,
DisableTx: disableTx,
}
}

type locklessMigrator struct {
m Migrator
}

func (llm *locklessMigrator) Init(ctx context.Context) error { return llm.m.Init(ctx) }
func (llm *locklessMigrator) LockDB(ctx context.Context) error { return nil }
func (llm *locklessMigrator) UnlockDB(ctx context.Context) error { return nil }

func (llm *locklessMigrator) Init(ctx context.Context) error { return llm.m.Init(ctx) }

func (llm *locklessMigrator) Version(ctx context.Context) (version int, err error) {
return llm.m.Version(ctx)
}
func (llm *locklessMigrator) SetVersion(ctx context.Context, version int) error {
return llm.m.SetVersion(ctx, version)
}

func (llm *locklessMigrator) Begin(ctx context.Context) error { return llm.m.Begin(ctx) }
func (llm *locklessMigrator) Commit(ctx context.Context) error { return llm.m.Commit(ctx) }
func (llm *locklessMigrator) Rollback(ctx context.Context) error { return llm.m.Rollback(ctx) }

func (llm *locklessMigrator) Exec(ctx context.Context, query string, args ...interface{}) error {
return llm.m.Exec(ctx, query, args...)
func (llm *locklessMigrator) DoStep(ctx context.Context, step Step) error {
return llm.m.DoStep(ctx, step)
}
Loading

0 comments on commit 03a4023

Please sign in to comment.