Skip to content

Commit

Permalink
Merge pull request #561 from cybertec-postgresql/560-txid-over-max-int4
Browse files Browse the repository at this point in the history
[-] alter timetable.execution_log.txid column type to bigint, fixes #560
  • Loading branch information
pashagolub authored Apr 20, 2023
2 parents f391cfe + 7b6c853 commit 010be60
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 18 deletions.
6 changes: 6 additions & 0 deletions internal/pgengine/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ var Migrations func() migrator.Option = func() migrator.Option {
return ExecuteMigrationScript(ctx, tx, "00534.sql")
},
},
&migrator.Migration{
Name: "00560 Alter txid column to bigint",
Func: func(ctx context.Context, tx pgx.Tx) error {
return ExecuteMigrationScript(ctx, tx, "00560.sql")
},
},
// adding new migration here, update "timetable"."migration" in "sql/init.sql"
// and "dbapi" variable in main.go!

Expand Down
6 changes: 3 additions & 3 deletions internal/pgengine/pgengine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestSchedulerFunctions(t *testing.T) {
var chains []pgengine.ChainTask
tx, txid, err := pge.StartTransaction(ctx, 0)
assert.NoError(t, err, "Should start transaction")
assert.Greater(t, txid, 0, "Should return transaction id")
assert.Greater(t, txid, int64(0), "Should return transaction id")
assert.NoError(t, pge.GetChainElements(ctx, &chains, 0), "Should no error in clean database")
assert.Empty(t, chains, "Should be empty in clean database")
pge.CommitTransaction(ctx, tx)
Expand All @@ -152,7 +152,7 @@ func TestSchedulerFunctions(t *testing.T) {
var paramVals []string
tx, txid, err := pge.StartTransaction(ctx, 0)
assert.NoError(t, err, "Should start transaction")
assert.Greater(t, txid, 0, "Should return transaction id")
assert.Greater(t, txid, int64(0), "Should return transaction id")
assert.NoError(t, pge.GetChainParamValues(ctx, &paramVals, &pgengine.ChainTask{
TaskID: 0,
ChainID: 0}), "Should no error in clean database")
Expand All @@ -170,7 +170,7 @@ func TestSchedulerFunctions(t *testing.T) {
t.Run("Check ExecuteSQLCommand function", func(t *testing.T) {
tx, txid, err := pge.StartTransaction(ctx, 0)
assert.NoError(t, err, "Should start transaction")
assert.Greater(t, txid, 0, "Should return transaction id")
assert.Greater(t, txid, int64(0) , "Should return transaction id")
f := func(sql string, params []string) error {
_, err := pge.ExecuteSQLCommand(ctx, tx, sql, params)
return err
Expand Down
2 changes: 1 addition & 1 deletion internal/pgengine/sql/ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ COMMENT ON TABLE timetable.log IS
CREATE TABLE timetable.execution_log (
chain_id BIGINT,
task_id BIGINT,
txid INTEGER NOT NULL,
txid BIGINT NOT NULL,
last_run TIMESTAMPTZ DEFAULT now(),
finished TIMESTAMPTZ,
pid BIGINT,
Expand Down
21 changes: 11 additions & 10 deletions internal/pgengine/sql/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ CREATE TABLE timetable.migration(
INSERT INTO
timetable.migration (id, version)
VALUES
(0, '00259 Restart migrations for v4'),
(1, '00305 Fix timetable.is_cron_in_time'),
(2, '00323 Append timetable.delete_job function'),
(3, '00329 Migration required for some new added functions'),
(4, '00334 Refactor timetable.task as plain schema without tree-like dependencies'),
(5, '00381 Rewrite active chain handling'),
(6, '00394 Add started_at column to active_session and active_chain tables'),
(7, '00417 Rename LOG database log level to INFO'),
(8, '00436 Add txid column to timetable.execution_log'),
(9, '00534 Use cron_split_to_arrays() in cron domain check');
(0, '00259 Restart migrations for v4'),
(1, '00305 Fix timetable.is_cron_in_time'),
(2, '00323 Append timetable.delete_job function'),
(3, '00329 Migration required for some new added functions'),
(4, '00334 Refactor timetable.task as plain schema without tree-like dependencies'),
(5, '00381 Rewrite active chain handling'),
(6, '00394 Add started_at column to active_session and active_chain tables'),
(7, '00417 Rename LOG database log level to INFO'),
(8, '00436 Add txid column to timetable.execution_log'),
(9, '00534 Use cron_split_to_arrays() in cron domain check'),
(10, '00560 Alter txid column to bigint');
2 changes: 2 additions & 0 deletions internal/pgengine/sql/migrations/00560.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE timetable.execution_log
ALTER txid TYPE BIGINT;
4 changes: 2 additions & 2 deletions internal/pgengine/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ type ChainTask struct {
Timeout int `db:"timeout"` // in milliseconds
StartedAt time.Time `db:"-"`
Duration int64 `db:"-"` // in microseconds
Txid int `db:"-"`
Txid int64 `db:"-"`
}

// StartTransaction returns transaction object, transaction id and error
func (pge *PgEngine) StartTransaction(ctx context.Context, chainID int) (tx pgx.Tx, txid int, err error) {
func (pge *PgEngine) StartTransaction(ctx context.Context, chainID int) (tx pgx.Tx, txid int64, err error) {
tx, err = pge.ConfigDb.Begin(ctx)
if err != nil {
return
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (sch *Scheduler) executeChain(ctx context.Context, chain Chain) {
var ChainTasks []pgengine.ChainTask
var bctx context.Context
var cancel context.CancelFunc
var txid int
var txid int64

ctx, cancel = getTimeoutContext(ctx, sch.Config().Resource.ChainTimeout, chain.Timeout)
if cancel != nil {
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ var (
commit = "000000"
version = "master"
date = "unknown"
dbapi = "00534"
dbapi = "00560"
)

func printVersion() {
Expand Down

0 comments on commit 010be60

Please sign in to comment.