Skip to content

Commit

Permalink
Manually open transactions to enforce IMMEDIATE status.
Browse files Browse the repository at this point in the history
We don't trust the _txlock= parameter in the Open() call, because who
knows whether that is actually respected.
  • Loading branch information
LTLA committed Jan 24, 2025
1 parent 1844349 commit b231e27
Showing 1 changed file with 82 additions and 35 deletions.
117 changes: 82 additions & 35 deletions database.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"os"
"fmt"
"time"
"log"
"sync"
"errors"
"strings"
Expand All @@ -16,17 +17,62 @@ import (
_ "modernc.org/sqlite"
)

type ActiveTransaction struct {
type writeTransaction struct {
Conn *sql.Conn
Tx *sql.Tx
Committed bool
Ctx context.Context
}

func (t *ActiveTransaction) Finish() {
t.Tx.Rollback() // This is a no-op once committed.
t.Conn.Close()
func rollbackOrLog(conn *sql.Conn, ctx context.Context) {
_, err := conn.ExecContext(ctx, "ROLLBACK TRANSACTION")
if err != nil {
log.Printf("failed to rollback a transaction; %v\n", err)
}
}

func closeOrLog(conn *sql.Conn) {
err := conn.Close()
if err != nil {
log.Printf("failed to close the connection; %v\n", err)
}
}

func createTransaction(db *sql.DB) (*ActiveTransaction, error) {
func (wt *writeTransaction) Finish() {
if !wt.Committed {
rollbackOrLog(wt.Conn, wt.Ctx)
}
closeOrLog(wt.Conn)
}

func (wt *writeTransaction) Commit() error {
_, err := wt.Conn.ExecContext(wt.Ctx, "COMMIT TRANSACTION")
if err != nil {
return err
}
wt.Committed = true
return nil
}

func (wt *writeTransaction) Exec(query string, args ...any) (sql.Result, error) {
res, err := wt.Conn.ExecContext(wt.Ctx, query, args...)
return res, err
}

func (wt *writeTransaction) Prepare(query string) (*sql.Stmt, error) {
stmt, err := wt.Conn.PrepareContext(wt.Ctx, query)
return stmt, err
}

func (wt * writeTransaction) Query(query string, args ...any) (*sql.Rows, error) {
rows, err := wt.Conn.QueryContext(wt.Ctx, query, args...)
return rows, err
}

func (wt * writeTransaction) QueryRow(query string, args ...any) *sql.Row {
return wt.Conn.QueryRowContext(wt.Ctx, query, args...)
}

func createWriteTransaction(db *sql.DB) (*writeTransaction, error) {
ctx := context.Background()
success := false

Expand All @@ -40,7 +86,7 @@ func createTransaction(db *sql.DB) (*ActiveTransaction, error) {
}
defer func() {
if !success {
conn.Close()
closeOrLog(conn)
}
}()

Expand All @@ -65,18 +111,21 @@ func createTransaction(db *sql.DB) (*ActiveTransaction, error) {
return nil, fmt.Errorf("failed to enable normal synchronous mode; %w", err)
}

tx, err := conn.BeginTx(ctx, nil)
// We set IMMEDIATE transactions to make debugging of locking issues
// easier. This should not be a perf problem as 'db' should only have one
// connection anyway, we just eliminate the possibility of deadlocks.
_, err = conn.ExecContext(ctx, "BEGIN IMMEDIATE")
if err != nil {
return nil, fmt.Errorf("failed to create transaction; %w", err)
}
defer func() {
if !success {
tx.Rollback()
rollbackOrLog(conn, ctx)
}
}()

success = true;
return &ActiveTransaction{ Conn: conn, Tx: tx }, nil
success = true
return &writeTransaction{ Conn: conn, Committed: false, Ctx: ctx }, nil
}

func initializeDatabase(path string) (*sql.DB, error) {
Expand All @@ -85,9 +134,7 @@ func initializeDatabase(path string) (*sql.DB, error) {
accessible = true
}

// This object's connections are intended for serial writing, so we set
// IMMEDIATE transactions to make debugging of locking issues easier.
db, err := sql.Open("sqlite", path + "?_txlock=immediate")
db, err := sql.Open("sqlite", path)
if err != nil {
return nil, fmt.Errorf("failed to open read/write SQLite handle; %w", err)
}
Expand All @@ -98,13 +145,13 @@ func initializeDatabase(path string) (*sql.DB, error) {

if (!accessible) {
err := func () error {
atx, err := createTransaction(db)
atx, err := createWriteTransaction(db)
if err != nil {
return fmt.Errorf("failed to prepare transaction for table setup; %w", err)
}
defer atx.Finish()

_, err = atx.Tx.Exec(`
_, err = atx.Exec(`
CREATE TABLE dirs(
did INTEGER PRIMARY KEY,
path TEXT NOT NULL UNIQUE,
Expand Down Expand Up @@ -142,14 +189,14 @@ CREATE INDEX index_links ON links(tid, fid);
return fmt.Errorf("failed to create table in %q; %w", path, err)
}

err = atx.Tx.Commit()
err = atx.Commit()
if err != nil {
return fmt.Errorf("failed to commit table creation commands for %s; %w", path, err)
}

// Write-ahead logging is persistent and doesn't need to be set on every connection,
// see https://www.sqlite.org/wal.html#persistence_of_wal_mode.
_, err = atx.Conn.ExecContext(context.Background(), "PRAGMA journal_mode = WAL")
_, err = atx.Exec("PRAGMA journal_mode = WAL")
if err != nil {
return fmt.Errorf("failed to enable write-ahead logging; %w", err)
}
Expand Down Expand Up @@ -203,7 +250,7 @@ func (s *insertStatements) Close() {
}
}

func newInsertStatements(tx *sql.Tx) (*insertStatements, error) {
func newInsertStatements(tx *writeTransaction) (*insertStatements, error) {
output := &insertStatements{}
success := false
defer func() {
Expand Down Expand Up @@ -346,18 +393,18 @@ func tokenizeMetadata(parsed interface{}, path string, pid int64, field string,
/**********************************************************************/

func deleteDirectory(db *sql.DB, directory string) error {
atx, err := createTransaction(db)
atx, err := createWriteTransaction(db)
if err != nil {
return fmt.Errorf("failed to prepare transaction for deletion; %w", err)
}
defer atx.Finish()

_, err = atx.Tx.Exec("DELETE FROM dirs WHERE path == ?", directory)
_, err = atx.Exec("DELETE FROM dirs WHERE path == ?", directory)
if err != nil {
return fmt.Errorf("failed to delete %q; %w", directory, err)
}

err = atx.Tx.Commit()
err = atx.Commit()
if err != nil {
return fmt.Errorf("failed to commit deletion transaction for %q; %w", directory, err)
}
Expand All @@ -372,7 +419,7 @@ type FileInfoWithPath struct {
Info fs.FileInfo
}

func compareToExistingPaths(tx *sql.Tx, did int64, all_paths map[string]fs.FileInfo) ([]*FileInfoWithPath, []*FileInfoWithPath, []string, error) {
func compareToExistingPaths(tx *writeTransaction, did int64, all_paths map[string]fs.FileInfo) ([]*FileInfoWithPath, []*FileInfoWithPath, []string, error) {
rows, err := tx.Query("SELECT path, time from paths WHERE did = ?", did)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to query the 'paths' table; %w", err)
Expand Down Expand Up @@ -415,7 +462,7 @@ func compareToExistingPaths(tx *sql.Tx, did int64, all_paths map[string]fs.FileI
return new_paths, update_paths, purge_paths, nil
}

func addDirectoryContents(tx *sql.Tx, path string, did int64, base_names []string, tokenizer* unicodeTokenizer) ([]string, error) {
func addDirectoryContents(tx *writeTransaction, path string, did int64, base_names []string, tokenizer* unicodeTokenizer) ([]string, error) {
all_failures := []string{}

dir_contents, dir_failures := listMetadata(path, base_names)
Expand Down Expand Up @@ -557,17 +604,17 @@ func addNewDirectory(db *sql.DB, path string, base_names []string, user string,
return nil, fmt.Errorf("failed to encode names as JSON; %w", err)
}

atx, err := createTransaction(db)
atx, err := createWriteTransaction(db)
if err != nil {
return nil, fmt.Errorf("failed to prepare transaction for adding a new directory; %w", err)
}
defer atx.Finish()

var did int64
row := atx.Tx.QueryRow("SELECT did FROM dirs WHERE path = ?", path)
row := atx.QueryRow("SELECT did FROM dirs WHERE path = ?", path)
err = row.Scan(&did)
if errors.Is(err, sql.ErrNoRows) {
err = atx.Tx.QueryRow(
err = atx.QueryRow(
"INSERT INTO dirs(path, user, time, names) VALUES(?, ?, ?, ?) RETURNING did",
path,
user,
Expand All @@ -576,7 +623,7 @@ func addNewDirectory(db *sql.DB, path string, base_names []string, user string,
).Scan(&did)

} else {
_, err = atx.Tx.Exec(
_, err = atx.Exec(
"UPDATE dirs SET user = ?, time = ?, names = ? WHERE did = ?",
user,
time.Now().Unix(),
Expand All @@ -589,9 +636,9 @@ func addNewDirectory(db *sql.DB, path string, base_names []string, user string,
return nil, fmt.Errorf("failed to insert new directory; %w", err)
}

failures, err := addDirectoryContents(atx.Tx, path, did, base_names, tokenizer)
failures, err := addDirectoryContents(atx, path, did, base_names, tokenizer)

err = atx.Tx.Commit()
err = atx.Commit()
if err != nil {
return nil, fmt.Errorf("failed to commit the transaction to add a new directory; %w", err)
}
Expand All @@ -607,7 +654,7 @@ type registeredDirectory struct {
Names []string
}

func listDirectories(tx *sql.Tx) ([]*registeredDirectory, error) {
func listDirectories(tx *writeTransaction) ([]*registeredDirectory, error) {
rows, err := tx.Query("SELECT did, path, names from dirs")
if err != nil {
return nil, fmt.Errorf("failed to query the 'dirs' table; %w", err)
Expand Down Expand Up @@ -636,27 +683,27 @@ func listDirectories(tx *sql.Tx) ([]*registeredDirectory, error) {
}

func updateDirectories(db *sql.DB, tokenizer* unicodeTokenizer) ([]string, error) {
atx, err := createTransaction(db)
atx, err := createWriteTransaction(db)
if err != nil {
return nil, fmt.Errorf("failed to prepare transaction for update; %w", err)
}
defer atx.Finish()

all_dirs, err := listDirectories(atx.Tx)
all_dirs, err := listDirectories(atx)
if err != nil {
return nil, err
}

all_failures := []string{}
for _, d := range all_dirs {
curfailures, err := addDirectoryContents(atx.Tx, d.Path, d.Id, d.Names, tokenizer)
curfailures, err := addDirectoryContents(atx, d.Path, d.Id, d.Names, tokenizer)
if err != nil {
return nil, err
}
all_failures = append(all_failures, curfailures...)
}

err = atx.Tx.Commit()
err = atx.Commit()
if err != nil {
return nil, fmt.Errorf("failed to commit the transaction to update directories; %w", err)
}
Expand Down

0 comments on commit b231e27

Please sign in to comment.