Skip to content

Commit

Permalink
Small fixes & cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
cristaloleg committed Jun 25, 2022
1 parent 89fd916 commit 8ede715
Show file tree
Hide file tree
Showing 9 changed files with 637 additions and 16 deletions.
24 changes: 8 additions & 16 deletions dbump.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,6 @@ type Migration struct {
Revert string // Revert query
}

// MigrationFn gives ability to use Go functions as migrations.
type MigrationFn func(ctx context.Context, conn Conn) error

// Conn represents a connection to the database.
type Conn interface {
Exec(ctx context.Context, query string, args ...interface{}) error
}

// MigratorMode to change migration flow.
type MigratorMode int

Expand All @@ -89,7 +81,7 @@ const (
modeMaxPossible
)

// AsLocklessMigrator makes given migrator to not take a lock in database.
// AsLocklessMigrator makes given migrator to not take a lock on database.
func AsLocklessMigrator(m Migrator) Migrator {
return &locklessMigrator{m}
}
Expand Down Expand Up @@ -126,7 +118,7 @@ func (m *mig) run(ctx context.Context) error {
if err != nil {
return fmt.Errorf("load: %w", err)
}
return m.runMigration(ctx, migrations)
return m.runMigrations(ctx, migrations)
}

func (m *mig) load() ([]*Migration, error) {
Expand All @@ -152,7 +144,7 @@ func (m *mig) load() ([]*Migration, error) {
return ms, nil
}

func (m *mig) runMigration(ctx context.Context, ms []*Migration) (err error) {
func (m *mig) runMigrations(ctx context.Context, ms []*Migration) (err error) {
if err := m.Init(ctx); err != nil {
return fmt.Errorf("init: %w", err)
}
Expand All @@ -174,11 +166,11 @@ func (m *mig) runMigration(ctx context.Context, ms []*Migration) (err error) {
}
}()

err = m.runMigrationLocked(ctx, ms)
err = m.runMigrationsLocked(ctx, ms)
return err
}

func (m *mig) runMigrationLocked(ctx context.Context, ms []*Migration) error {
func (m *mig) runMigrationsLocked(ctx context.Context, ms []*Migration) error {
curr, target, err := m.getCurrAndTargetVersions(ctx, len(ms))
if err != nil {
return err
Expand Down Expand Up @@ -330,9 +322,9 @@ func (llm *locklessMigrator) SetVersion(ctx context.Context, version int) error
return llm.m.SetVersion(ctx, version)
}

func (llm *locklessMigrator) Begin(ctx context.Context) error { return llm.Begin(ctx) }
func (llm *locklessMigrator) Commit(ctx context.Context) error { return llm.Commit(ctx) }
func (llm *locklessMigrator) Rollback(ctx context.Context) error { return llm.Rollback(ctx) }
func (llm *locklessMigrator) Begin(ctx context.Context) error { return llm.m.Begin(ctx) }
func (llm *locklessMigrator) Commit(ctx context.Context) error { return llm.m.Commit(ctx) }
func (llm *locklessMigrator) Rollback(ctx context.Context) error { return llm.m.Rollback(ctx) }

func (llm *locklessMigrator) Exec(ctx context.Context, query string, args ...interface{}) error {
return llm.m.Exec(ctx, query, args...)
Expand Down
51 changes: 51 additions & 0 deletions dbump_ch/clickhouse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package dbump_ch

import (
"context"
"database/sql"
)

// MigratorClickHouse to migrate ClickHouse.
type MigratorClickHouse struct {
db *sql.DB
versionTable string
}

// NewMigratorClickHouse instantiates new MigratorClickHouse.
func NewMigratorClickHouse(db *sql.DB) *MigratorClickHouse {
return &MigratorClickHouse{
db: db,
versionTable: "_dbump_schema_version",
}
}

// LockDB is a method for Migrator interface.
func (ch *MigratorClickHouse) LockDB(ctx context.Context) error {
// TODO: currently no-op
return nil
}

// UnlockDB is a method for Migrator interface.
func (ch *MigratorClickHouse) UnlockDB(ctx context.Context) error {
// TODO: currently no-op
return nil
}

// Version is a method for Migrator interface.
func (ch *MigratorClickHouse) Version(ctx context.Context) (version int, err error) {
row := ch.db.QueryRowContext(ctx, "SELECT version FROM "+ch.versionTable)
err = row.Scan(&version)
return version, err
}

// SetVersion is a method for Migrator interface.
func (ch *MigratorClickHouse) SetVersion(ctx context.Context, version int) error {
_, err := ch.db.ExecContext(ctx, "UPDATE "+ch.versionTable+" SET version = $1", version)
return err
}

// Exec is a method for Migrator interface.
func (ch *MigratorClickHouse) Exec(ctx context.Context, query string, args ...interface{}) error {
_, err := ch.db.ExecContext(ctx, query)
return err
}
7 changes: 7 additions & 0 deletions dbump_ch/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module github.com/cristalhq/dbump/dbump_ch

go 1.16

require (
github.com/cristalhq/dbump v0.1.1
)
2 changes: 2 additions & 0 deletions dbump_ch/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
github.com/cristalhq/dbump v0.1.1 h1:cRbnpydrF19ML0wNO2pGsY6qwEVH/rtwnN64VwZpy9M=
github.com/cristalhq/dbump v0.1.1/go.mod h1:rAjULuStbuNPCLrJT62Eu7Sp/2gVt/4URUvsnPK1yFA=
8 changes: 8 additions & 0 deletions dbump_mysql/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module github.com/cristalhq/dbump/dbump_mysql

go 1.16

require (
github.com/cristalhq/dbump v0.1.1
github.com/lib/pq v1.10.5
)
4 changes: 4 additions & 0 deletions dbump_mysql/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
github.com/cristalhq/dbump v0.1.1 h1:cRbnpydrF19ML0wNO2pGsY6qwEVH/rtwnN64VwZpy9M=
github.com/cristalhq/dbump v0.1.1/go.mod h1:rAjULuStbuNPCLrJT62Eu7Sp/2gVt/4URUvsnPK1yFA=
github.com/lib/pq v1.10.5 h1:J+gdV2cUmX7ZqL2B0lFcW0m+egaHC2V3lpO8nWxyYiQ=
github.com/lib/pq v1.10.5/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
65 changes: 65 additions & 0 deletions dbump_mysql/mysql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package dbump_mysql

import (
"context"
"database/sql"
"fmt"
)

// to prevent multiple migrations running at the same time
const lockNum int64 = 777_777_777

// Migrator to migrate MySQL.
type Migrator struct {
db *sql.DB
versionTable string
}

// NewMigrator instantiates new Migrator.
func NewMigrator(db *sql.DB) *Migrator {
return &Migrator{
db: db,
versionTable: "_dbump_schema_version",
}
}

// Init migrator.
func (pg *Migrator) Init(ctx context.Context) error {
query := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
version BIGINT NOT NULL PRIMARY KEY,
created_at TIMESTAMP NOT NULL
);`, pg.versionTable)
_, err := pg.db.ExecContext(ctx, query)
return err
}

// LockDB is a method for Migrator interface.
func (my *Migrator) LockDB(ctx context.Context) error {
_, err := my.db.ExecContext(ctx, `SELECT GET_LOCK(?, 10)`, lockNum)
return err
}

// UnlockDB is a method for Migrator interface.
func (my *Migrator) UnlockDB(ctx context.Context) error {
_, err := my.db.ExecContext(ctx, "SELECT RELEASE_LOCK(?)", lockNum)
return err
}

// Version is a method for Migrator interface.
func (my *Migrator) Version(ctx context.Context) (version int, err error) {
row := my.db.QueryRowContext(ctx, "SELECT version FROM "+my.versionTable)
err = row.Scan(&version)
return version, err
}

// SetVersion is a method for Migrator interface.
func (my *Migrator) SetVersion(ctx context.Context, version int) error {
_, err := my.db.ExecContext(ctx, "UPDATE "+my.versionTable+" SET version = $1", version)
return err
}

// Exec is a method for Migrator interface.
func (my *Migrator) Exec(ctx context.Context, query string, args ...interface{}) error {
_, err := my.db.ExecContext(ctx, query)
return err
}
7 changes: 7 additions & 0 deletions dbump_mysql/mysql_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package dbump_mysql_test

import (
"context"
"database/sql"
"testing"
)
Loading

0 comments on commit 8ede715

Please sign in to comment.