From 11192c362e234cfa35a4e8906e0083e0ccefe6d1 Mon Sep 17 00:00:00 2001 From: Peter Chen <34582813+PeterChen13579@users.noreply.github.com> Date: Wed, 9 Oct 2024 12:11:55 -0400 Subject: [PATCH] fix: deletion script will not OOM (#1679) fixes #1676 and #1678 --- .../cassandra_delete_range.go | 120 ++++- .../internal/cass/cass.go | 468 +++++++++++++----- .../internal/util/util.go | 6 + 3 files changed, 461 insertions(+), 133 deletions(-) mode change 100644 => 100755 tools/cassandra_delete_range/cassandra_delete_range.go diff --git a/tools/cassandra_delete_range/cassandra_delete_range.go b/tools/cassandra_delete_range/cassandra_delete_range.go old mode 100644 new mode 100755 index c5941b9a2..71c6f6141 --- a/tools/cassandra_delete_range/cassandra_delete_range.go +++ b/tools/cassandra_delete_range/cassandra_delete_range.go @@ -5,9 +5,11 @@ package main import ( + "bufio" "fmt" "log" "os" + "strconv" "strings" "time" "xrplf/clio/cassandra_delete_range/internal/cass" @@ -33,7 +35,7 @@ var ( deleteBefore = app.Command("delete-before", "Prunes everything before the given ledger index") deleteBeforeLedgerIdx = deleteBefore.Arg("idx", "Sets the latest ledger_index to keep around (delete everything before this ledger index)").Required().Uint64() - getLedgerRange = app.Command("get-ledger-range", "Fetch the current lender_range table values") + getLedgerRange = app.Command("get-ledger-range", "Fetch the current ledger_range table values") nodesInCluster = app.Flag("nodes-in-cluster", "Number of nodes in your Scylla cluster").Short('n').Default(fmt.Sprintf("%d", defaultNumberOfNodesInCluster)).Int() coresInNode = app.Flag("cores-in-node", "Number of cores in each node").Short('c').Default(fmt.Sprintf("%d", defaultNumberOfCoresInNode)).Int() @@ -44,6 +46,7 @@ var ( clusterCQLVersion = app.Flag("cql-version", "The CQL version to use").Short('l').Default("3.0.0").String() clusterPageSize = app.Flag("cluster-page-size", "Page size of results").Short('p').Default("5000").Int() keyspace = app.Flag("keyspace", "Keyspace to use").Short('k').Default("clio_fh").String() + resume = app.Flag("resume", "Whether to resume deletion from the previous command due to something crashing").Default("false").Bool() userName = app.Flag("username", "Username to use when connecting to the cluster").String() password = app.Flag("password", "Password to use when connecting to the cluster").String() @@ -56,9 +59,11 @@ var ( skipLedgerTransactionsTable = app.Flag("skip-ledger-transactions", "Whether to skip deletion from ledger_transactions table").Default("false").Bool() skipLedgersTable = app.Flag("skip-ledgers", "Whether to skip deletion from ledgers table").Default("false").Bool() skipWriteLatestLedger = app.Flag("skip-write-latest-ledger", "Whether to skip writing the latest ledger index").Default("false").Bool() + skipAccTransactionsTable = app.Flag("skip-account-transactions", "Whether to skip deletion from account_transactions table").Default("false").Bool() - workerCount = 1 // the calculated number of parallel goroutines the client should run - ranges []*util.TokenRange // the calculated ranges to be executed in parallel + workerCount = 1 // the calculated number of parallel goroutines the client should run + ranges []*util.TokenRange // the calculated ranges to be executed in parallel + ledgerOrTokenRange *util.StoredRange // mapping of startRange -> endRange. Used for resume deletion ) func main() { @@ -70,6 +75,11 @@ func main() { log.Fatal(err) } + cmd := strings.Join(os.Args[1:], " ") + if *resume { + prepareResume(&cmd) + } + clioCass := cass.NewClioCass(&cass.Settings{ SkipSuccessorTable: *skipSuccessorTable, SkipObjectsTable: *skipObjectsTable, @@ -79,8 +89,11 @@ func main() { SkipLedgerTransactionsTable: *skipLedgerHashesTable, SkipLedgersTable: *skipLedgersTable, SkipWriteLatestLedger: *skipWriteLatestLedger, + SkipAccTransactionsTable: *skipAccTransactionsTable, WorkerCount: workerCount, - Ranges: ranges}, cluster) + Ranges: ranges, + RangesRead: ledgerOrTokenRange, + Command: cmd}, cluster) switch command { case deleteAfter.FullCommand(): @@ -157,6 +170,7 @@ Skip deletion of: - diff table : %t - ledger_transactions table : %t - ledgers table : %t +- account_tx table : %t Will update ledger_range : %t @@ -179,7 +193,9 @@ Will update ledger_range : %t *skipDiffTable, *skipLedgerTransactionsTable, *skipLedgersTable, - !*skipWriteLatestLedger) + *skipAccTransactionsTable, + !*skipWriteLatestLedger, + ) fmt.Println(runParameters) } @@ -208,3 +224,97 @@ func prepareDb(dbHosts *string) (*gocql.ClusterConfig, error) { return cluster, nil } + +func prepareResume(cmd *string) { + // format of file continue.txt is + /* + Previous user command (must match the same command to resume deletion) + Table name (ie. objects, ledger_hashes etc) + Values of token_ranges (each pair of values seperated line by line) + */ + + file, err := os.Open("continue.txt") + if err != nil { + log.Fatal("continue.txt does not exist. Aborted") + } + defer file.Close() + + if err != nil { + log.Fatalf("Failed to open file: %v", err) + } + scanner := bufio.NewScanner(file) + scanner.Scan() + + // --resume must be last flag passed; so can check command matches + if os.Args[len(os.Args)-1] != "--resume" { + log.Fatal("--resume must be the last flag passed") + } + + // get rid of --resume at the end + *cmd = strings.Join(os.Args[1:len(os.Args)-1], " ") + + // makes sure command that got aborted matches the user command they enter + if scanner.Text() != *cmd { + log.Fatalf("File continue.txt has %s command stored. \n You provided %s which does not match. \n Aborting...", scanner.Text(), *cmd) + } + + scanner.Scan() + // skip the neccessary tables based on where the program aborted + // for example if account_tx, all tables before account_tx + // should be already deleted so we skip for deletion + tableFound := false + switch scanner.Text() { + case "account_tx": + *skipLedgersTable = true + fallthrough + case "ledgers": + *skipLedgerTransactionsTable = true + fallthrough + case "ledger_transactions": + *skipDiffTable = true + fallthrough + case "diff": + *skipTransactionsTable = true + fallthrough + case "transactions": + *skipLedgerHashesTable = true + fallthrough + case "ledger_hashes": + *skipObjectsTable = true + fallthrough + case "objects": + *skipSuccessorTable = true + fallthrough + case "successor": + tableFound = true + } + + if !tableFound { + log.Fatalf("Invalid table: %s", scanner.Text()) + } + + scanner.Scan() + rangeRead := make(map[int64]int64) + + // now go through all the ledger range and load it to a set + for scanner.Scan() { + line := scanner.Text() + tokenRange := strings.Split(line, ",") + if len(tokenRange) != 2 { + log.Fatalf("Range is not two integers. %s . Aborting...", tokenRange) + } + startStr := strings.TrimSpace(tokenRange[0]) + endStr := strings.TrimSpace(tokenRange[1]) + + // convert string to int64 + start, err1 := strconv.ParseInt(startStr, 10, 64) + end, err2 := strconv.ParseInt(endStr, 10, 64) + + if err1 != nil || err2 != nil { + log.Fatalf("Error converting integer: %s, %s", err1, err2) + } + rangeRead[start] = end + } + ledgerOrTokenRange = &util.StoredRange{} + ledgerOrTokenRange.TokenRange = rangeRead +} diff --git a/tools/cassandra_delete_range/internal/cass/cass.go b/tools/cassandra_delete_range/internal/cass/cass.go index f60fe482a..18275192c 100644 --- a/tools/cassandra_delete_range/internal/cass/cass.go +++ b/tools/cassandra_delete_range/internal/cass/cass.go @@ -21,8 +21,9 @@ type deleteInfo struct { } type deleteParams struct { - Seq uint64 - Blob []byte // hash, key, etc + Seq uint64 + Blob []byte // hash, key, etc + tnxIndex uint64 //transaction index } type columnSettings struct { @@ -30,6 +31,12 @@ type columnSettings struct { UseBlob bool } +type deleteMethod struct { + deleteObject maybe.Maybe[bool] + deleteTransaction maybe.Maybe[bool] + deleteGeneral maybe.Maybe[bool] +} + type Settings struct { SkipSuccessorTable bool SkipObjectsTable bool @@ -39,9 +46,50 @@ type Settings struct { SkipLedgerTransactionsTable bool SkipLedgersTable bool SkipWriteLatestLedger bool + SkipAccTransactionsTable bool WorkerCount int Ranges []*util.TokenRange + RangesRead *util.StoredRange // Used to resume deletion + Command string +} + +type Marker struct { + cmd string + file *os.File +} + +func NewMarker(cmd string) *Marker { + return &Marker{cmd: cmd} +} + +func CloseMarker(m *Marker) { + if m.file != nil { + m.file.Close() + } + os.Remove("continue.txt") +} + +func (m *Marker) EnterTable(table string) error { + // Create the file + file, err := os.OpenFile("continue.txt", os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644) + m.file = file + if err != nil { + return fmt.Errorf("failed to create file: %w", err) + } + fmt.Fprintf(m.file, "%s\n", m.cmd) + m.file.WriteString(fmt.Sprintf("%s\n", table)) + return nil +} + +func (m *Marker) MarkProgress(x int64, y int64) { + fmt.Fprintf(m.file, "%d, %d \n", x, y) +} + +func (m *Marker) ExitTable() { + m.file.Close() + m.file = nil + os.Remove("continue.txt") } type Cass interface { @@ -67,7 +115,7 @@ func (c *ClioCass) DeleteBefore(ledgerIdx uint64) { log.Printf("DB ledger range is %d -> %d\n", firstLedgerIdxInDB, latestLedgerIdxInDB) - if firstLedgerIdxInDB > ledgerIdx { + if firstLedgerIdxInDB >= ledgerIdx { log.Fatal("Earliest ledger index in DB is greater than the one specified. Aborting...") } @@ -98,7 +146,7 @@ func (c *ClioCass) DeleteAfter(ledgerIdx uint64) { log.Fatal("Earliest ledger index in DB is greater than the one specified. Aborting...") } - if latestLedgerIdxInDB < ledgerIdx { + if latestLedgerIdxInDB <= ledgerIdx { log.Fatal("Latest ledger index in DB is smaller than the one specified. Aborting...") } @@ -146,7 +194,6 @@ func (c *ClioCass) pruneData( var totalRows uint64 var totalDeletes uint64 - var info deleteInfo var rowsCount uint64 var deleteCount uint64 var errCount uint64 @@ -180,100 +227,156 @@ func (c *ClioCass) pruneData( // successor queries if !c.settings.SkipSuccessorTable { + marker := NewMarker(c.settings.Command) + if err := marker.EnterTable("successor"); err != nil { + return err + } + log.Println("Generating delete queries for successor table") - info, rowsCount, errCount = c.prepareDeleteQueries(fromLedgerIdx, toLedgerIdx, + rowsCount, deleteCount, errCount = c.prepareAndExecuteDeleteQueries(marker, fromLedgerIdx, toLedgerIdx, "SELECT key, seq FROM successor WHERE token(key) >= ? AND token(key) <= ?", - "DELETE FROM successor WHERE key = ? AND seq = ?", false) - log.Printf("Total delete queries: %d\n", len(info.Data)) + "DELETE FROM successor WHERE key = ? AND seq = ?", deleteMethod{deleteGeneral: maybe.Set(true)}, columnSettings{UseBlob: true, UseSeq: false}) + log.Printf("Total delete queries: %d\n", deleteCount) log.Printf("Total traversed rows: %d\n\n", rowsCount) - totalErrors += errCount totalRows += rowsCount - deleteCount, errCount = c.performDeleteQueries(&info, columnSettings{UseBlob: true, UseSeq: true}) totalErrors += errCount totalDeletes += deleteCount + + marker.ExitTable() } // objects queries if !c.settings.SkipObjectsTable { + marker := NewMarker(c.settings.Command) + if err := marker.EnterTable("objects"); err != nil { + return err + } + log.Println("Generating delete queries for objects table") - info, rowsCount, errCount = c.prepareDeleteQueries(fromLedgerIdx, toLedgerIdx, + rowsCount, deleteCount, errCount = c.prepareAndExecuteDeleteQueries(marker, fromLedgerIdx, toLedgerIdx, "SELECT key, sequence FROM objects WHERE token(key) >= ? AND token(key) <= ?", - "DELETE FROM objects WHERE key = ? AND sequence = ?", true) - log.Printf("Total delete queries: %d\n", len(info.Data)) + "DELETE FROM objects WHERE key = ? AND sequence = ?", deleteMethod{deleteObject: maybe.Set(true)}, columnSettings{UseBlob: true, UseSeq: true}) + log.Printf("Total delete queries: %d\n", deleteCount) log.Printf("Total traversed rows: %d\n\n", rowsCount) totalErrors += errCount totalRows += rowsCount - deleteCount, errCount = c.performDeleteQueries(&info, columnSettings{UseBlob: true, UseSeq: true}) - totalErrors += errCount totalDeletes += deleteCount + + marker.ExitTable() } // ledger_hashes queries if !c.settings.SkipLedgerHashesTable { + marker := NewMarker(c.settings.Command) + if err := marker.EnterTable("ledger_hashes"); err != nil { + return err + } + log.Println("Generating delete queries for ledger_hashes table") - info, rowsCount, errCount = c.prepareDeleteQueries(fromLedgerIdx, toLedgerIdx, + rowsCount, deleteCount, errCount = c.prepareAndExecuteDeleteQueries(marker, fromLedgerIdx, toLedgerIdx, "SELECT hash, sequence FROM ledger_hashes WHERE token(hash) >= ? AND token(hash) <= ?", - "DELETE FROM ledger_hashes WHERE hash = ?", false) - log.Printf("Total delete queries: %d\n", len(info.Data)) + "DELETE FROM ledger_hashes WHERE hash = ?", deleteMethod{deleteGeneral: maybe.Set(true)}, columnSettings{UseBlob: true, UseSeq: false}) + log.Printf("Total delete queries: %d\n", deleteCount) log.Printf("Total traversed rows: %d\n\n", rowsCount) - totalErrors += errCount totalRows += rowsCount - deleteCount, errCount = c.performDeleteQueries(&info, columnSettings{UseBlob: true, UseSeq: false}) totalErrors += errCount totalDeletes += deleteCount + + marker.ExitTable() } // transactions queries if !c.settings.SkipTransactionsTable { + marker := NewMarker(c.settings.Command) + if err := marker.EnterTable("transactions"); err != nil { + return err + } + log.Println("Generating delete queries for transactions table") - info, rowsCount, errCount = c.prepareDeleteQueries(fromLedgerIdx, toLedgerIdx, + rowsCount, deleteCount, errCount = c.prepareAndExecuteDeleteQueries(marker, fromLedgerIdx, toLedgerIdx, "SELECT hash, ledger_sequence FROM transactions WHERE token(hash) >= ? AND token(hash) <= ?", - "DELETE FROM transactions WHERE hash = ?", false) - log.Printf("Total delete queries: %d\n", len(info.Data)) + "DELETE FROM transactions WHERE hash = ?", deleteMethod{deleteGeneral: maybe.Set(true)}, columnSettings{UseBlob: true, UseSeq: false}) + log.Printf("Total delete queries: %d\n", deleteCount) log.Printf("Total traversed rows: %d\n\n", rowsCount) - totalErrors += errCount totalRows += rowsCount - deleteCount, errCount = c.performDeleteQueries(&info, columnSettings{UseBlob: true, UseSeq: false}) totalErrors += errCount totalDeletes += deleteCount + + marker.ExitTable() } // diff queries if !c.settings.SkipDiffTable { + marker := NewMarker(c.settings.Command) + if err := marker.EnterTable("diff"); err != nil { + return err + } + log.Println("Generating delete queries for diff table") - info = c.prepareSimpleDeleteQueries(rangeFrom, rangeTo, - "DELETE FROM diff WHERE seq = ?") - log.Printf("Total delete queries: %d\n\n", len(info.Data)) - deleteCount, errCount = c.performDeleteQueries(&info, columnSettings{UseBlob: false, UseSeq: true}) + deleteCount, errCount = c.prepareAndExecuteSimpleDeleteQueries(rangeFrom, rangeTo, + "DELETE FROM diff WHERE seq = ?", columnSettings{UseBlob: false, UseSeq: true}) + log.Printf("Total delete queries: %d\n\n", deleteCount) totalErrors += errCount totalDeletes += deleteCount + + marker.ExitTable() } // ledger_transactions queries if !c.settings.SkipLedgerTransactionsTable { + marker := NewMarker(c.settings.Command) + if err := marker.EnterTable("ledger_transactions"); err != nil { + return err + } + log.Println("Generating delete queries for ledger_transactions table") - info = c.prepareSimpleDeleteQueries(rangeFrom, rangeTo, - "DELETE FROM ledger_transactions WHERE ledger_sequence = ?") - log.Printf("Total delete queries: %d\n\n", len(info.Data)) - deleteCount, errCount = c.performDeleteQueries(&info, columnSettings{UseBlob: false, UseSeq: true}) + deleteCount, errCount = c.prepareAndExecuteSimpleDeleteQueries(rangeFrom, rangeTo, + "DELETE FROM ledger_transactions WHERE ledger_sequence = ?", columnSettings{UseBlob: false, UseSeq: true}) + log.Printf("Total delete queries: %d\n\n", deleteCount) totalErrors += errCount totalDeletes += deleteCount + + marker.ExitTable() } // ledgers queries if !c.settings.SkipLedgersTable { + marker := NewMarker(c.settings.Command) + if err := marker.EnterTable("ledgers"); err != nil { + return err + } + log.Println("Generating delete queries for ledgers table") + deleteCount, errCount = c.prepareAndExecuteSimpleDeleteQueries(rangeFrom, rangeTo, + "DELETE FROM ledgers WHERE sequence = ?", columnSettings{UseBlob: false, UseSeq: true}) + log.Printf("Total delete queries: %d\n\n", deleteCount) + totalErrors += errCount + totalDeletes += deleteCount + + marker.ExitTable() + } - info = c.prepareSimpleDeleteQueries(rangeFrom, rangeTo, - "DELETE FROM ledgers WHERE sequence = ?") - log.Printf("Total delete queries: %d\n\n", len(info.Data)) - deleteCount, errCount = c.performDeleteQueries(&info, columnSettings{UseBlob: false, UseSeq: true}) + // account_tx queries + if !c.settings.SkipAccTransactionsTable { + marker := NewMarker(c.settings.Command) + if err := marker.EnterTable("account_tx"); err != nil { + return err + } + + log.Println("Generating delete queries for account transactions table") + rowsCount, deleteCount, errCount = c.prepareAndExecuteDeleteQueries(marker, fromLedgerIdx, toLedgerIdx, + "SELECT account, seq_idx FROM account_tx WHERE token(account) >= ? AND token(account) <= ?", + "DELETE FROM account_tx WHERE account = ? AND seq_idx = (?, ?)", deleteMethod{deleteTransaction: maybe.Set(true)}, columnSettings{UseBlob: true, UseSeq: false}) + log.Printf("Total delete queries: %d\n", deleteCount) + log.Printf("Total traversed rows: %d\n\n", rowsCount) + totalRows += rowsCount totalErrors += errCount totalDeletes += deleteCount + + marker.ExitTable() } - // TODO: tbd what to do with account_tx as it got tuple for seq_idx - // TODO: also, whether we need to take care of nft tables and other stuff like that + // TODO: take care of nft tables and other stuff like that if !c.settings.SkipWriteLatestLedger { var ( @@ -304,27 +407,150 @@ func (c *ClioCass) pruneData( return nil } -func (c *ClioCass) prepareSimpleDeleteQueries( +func (c *ClioCass) prepareAndExecuteSimpleDeleteQueries( fromLedgerIdx uint64, toLedgerIdx uint64, deleteQueryTemplate string, -) deleteInfo { + colSettings columnSettings, +) (uint64, uint64) { + var totalDeletes uint64 + var totalErrors uint64 + var info = deleteInfo{Query: deleteQueryTemplate} - for i := fromLedgerIdx; i <= toLedgerIdx; i++ { - info.Data = append(info.Data, deleteParams{Seq: i}) + if session, err := c.clusterConfig.CreateSession(); err == nil { + defer session.Close() + for i := fromLedgerIdx; i <= toLedgerIdx; i++ { + info.Data = append(info.Data, deleteParams{Seq: i}) + // for every 1000 queries in data, delete + if len(info.Data) == 1000 { + _, err := c.performDeleteQueries(&info, session, colSettings) + atomic.AddUint64(&totalDeletes, uint64(len(info.Data))) + atomic.AddUint64(&totalErrors, err) + info = deleteInfo{Query: deleteQueryTemplate} + } + } + // delete the rest of queries if exists + if len(info.Data) > 0 { + _, err := c.performDeleteQueries(&info, session, colSettings) + atomic.AddUint64(&totalDeletes, uint64(len(info.Data))) + atomic.AddUint64(&totalErrors, err) + } + } else { + log.Printf("ERROR: %s\n", err) + fmt.Fprintf(os.Stderr, "FAILED TO CREATE SESSION: %s\n", err) + atomic.AddUint64(&totalErrors, 1) + } + return totalDeletes, totalErrors +} + +func (c *ClioCass) prepareDefaultDelete( + scanner gocql.Scanner, + info *deleteInfo, + fromLedgerIdx maybe.Maybe[uint64], + toLedgerIdx maybe.Maybe[uint64], + rowsRetrieved *uint64, +) bool { + for scanner.Next() { + var key []byte + var seq uint64 + + err := scanner.Scan(&key, &seq) + if err == nil { + *rowsRetrieved++ + + // only grab the rows that are in the correct range of sequence numbers + if fromLedgerIdx.HasValue() && fromLedgerIdx.Value() <= seq { + info.Data = append(info.Data, deleteParams{Seq: seq, Blob: key}) + } else if toLedgerIdx.HasValue() && seq <= toLedgerIdx.Value() { + info.Data = append(info.Data, deleteParams{Seq: seq, Blob: key}) + } + } else { + return false + } + } + return true +} + +func (c *ClioCass) prepareObjectDelete( + scanner gocql.Scanner, + info *deleteInfo, + fromLedgerIdx maybe.Maybe[uint64], + toLedgerIdx maybe.Maybe[uint64], + rowsRetrieved *uint64, +) bool { + var previousKey []byte + var foundLastValid bool + var keepLastValid = true + + for scanner.Next() { + var key []byte + var seq uint64 + + err := scanner.Scan(&key, &seq) + if err == nil { + *rowsRetrieved++ + + if keepLastValid && !slices.Equal(previousKey, key) { + previousKey = key + foundLastValid = false + } + + // only grab the rows that are in the correct range of sequence numbers + if fromLedgerIdx.HasValue() && fromLedgerIdx.Value() <= seq { + info.Data = append(info.Data, deleteParams{Seq: seq, Blob: key}) + } else if toLedgerIdx.HasValue() { + if seq <= toLedgerIdx.Value() && (!keepLastValid || foundLastValid) { + info.Data = append(info.Data, deleteParams{Seq: seq, Blob: key}) + } else if seq <= toLedgerIdx.Value()+1 { + foundLastValid = true + } + } + } else { + return false + } } + return true +} - return info +func (c *ClioCass) prepareAccTxnDelete( + scanner gocql.Scanner, + info *deleteInfo, + fromLedgerIdx maybe.Maybe[uint64], + toLedgerIdx maybe.Maybe[uint64], + rowsRetrieved *uint64, +) bool { + for scanner.Next() { + var key []byte + var ledgerIndex, txnIndex uint64 + + // account_tx/nft table has seq_idx frozen> + err := scanner.Scan(&key, &ledgerIndex, &txnIndex) + if err == nil { + *rowsRetrieved++ + + // only grab the rows that are in the correct range of sequence numbers + if fromLedgerIdx.HasValue() && fromLedgerIdx.Value() <= ledgerIndex { + info.Data = append(info.Data, deleteParams{Seq: ledgerIndex, Blob: key, tnxIndex: txnIndex}) + } else if toLedgerIdx.HasValue() && ledgerIndex <= toLedgerIdx.Value() { + info.Data = append(info.Data, deleteParams{Seq: ledgerIndex, Blob: key, tnxIndex: txnIndex}) + } + } else { + return false + } + } + return true } -func (c *ClioCass) prepareDeleteQueries( +func (c *ClioCass) prepareAndExecuteDeleteQueries( + marker *Marker, fromLedgerIdx maybe.Maybe[uint64], toLedgerIdx maybe.Maybe[uint64], queryTemplate string, deleteQueryTemplate string, - keepLastValid bool, -) (deleteInfo, uint64, uint64) { + method deleteMethod, + colSettings columnSettings, +) (uint64, uint64, uint64) { rangesChannel := make(chan *util.TokenRange, len(c.settings.Ranges)) for i := range c.settings.Ranges { rangesChannel <- c.settings.Ranges[i] @@ -332,24 +558,12 @@ func (c *ClioCass) prepareDeleteQueries( close(rangesChannel) - outChannel := make(chan deleteParams) - var info = deleteInfo{Query: deleteQueryTemplate} - - go func() { - total := uint64(0) - for params := range outChannel { - total += 1 - if total%1000 == 0 { - log.Printf("... %d queries ...\n", total) - } - info.Data = append(info.Data, params) - } - }() - var wg sync.WaitGroup var sessionCreationWaitGroup sync.WaitGroup var totalRows uint64 + var totalDeletes uint64 var totalErrors uint64 + counter := uint64(1000) wg.Add(c.settings.WorkerCount) sessionCreationWaitGroup.Add(c.settings.WorkerCount) @@ -368,58 +582,66 @@ func (c *ClioCass) prepareDeleteQueries( preparedQuery := session.Query(q) for r := range rangesChannel { + if c.settings.RangesRead != nil { + if value, exists := c.settings.RangesRead.TokenRange[r.StartRange]; exists { + // Check for end range + if value == r.EndRange { + marker.MarkProgress(r.StartRange, r.EndRange) + continue + } + } + } + preparedQuery.Bind(r.StartRange, r.EndRange) var pageState []byte var rowsRetrieved uint64 - - var previousKey []byte - var foundLastValid bool + var info = deleteInfo{Query: deleteQueryTemplate} for { iter := preparedQuery.PageSize(c.clusterConfig.PageSize).PageState(pageState).Iter() nextPageState := iter.PageState() scanner := iter.Scanner() - for scanner.Next() { - var key []byte - var seq uint64 + var prepareDeleteResult bool - err = scanner.Scan(&key, &seq) - if err == nil { - rowsRetrieved++ - - if keepLastValid && !slices.Equal(previousKey, key) { - previousKey = key - foundLastValid = false - } + // query object table first as it is the largest table by far + if method.deleteObject.HasValue() && method.deleteObject.Value() { + prepareDeleteResult = c.prepareObjectDelete(scanner, &info, fromLedgerIdx, toLedgerIdx, &rowsRetrieved) + } else if method.deleteTransaction.HasValue() && method.deleteTransaction.Value() { + prepareDeleteResult = c.prepareAccTxnDelete(scanner, &info, fromLedgerIdx, toLedgerIdx, &rowsRetrieved) + } else if method.deleteGeneral.HasValue() && method.deleteGeneral.Value() { + prepareDeleteResult = c.prepareDefaultDelete(scanner, &info, fromLedgerIdx, toLedgerIdx, &rowsRetrieved) + } - // only grab the rows that are in the correct range of sequence numbers - if fromLedgerIdx.HasValue() && fromLedgerIdx.Value() <= seq { - outChannel <- deleteParams{Seq: seq, Blob: key} - } else if toLedgerIdx.HasValue() { - if seq < toLedgerIdx.Value() && (!keepLastValid || foundLastValid) { - outChannel <- deleteParams{Seq: seq, Blob: key} - } else if seq <= toLedgerIdx.Value()+1 { - foundLastValid = true - } - } - } else { - log.Printf("ERROR: page iteration failed: %s\n", err) - fmt.Fprintf(os.Stderr, "FAILED QUERY: %s\n", fmt.Sprintf("%s [from=%d][to=%d][pagestate=%x]", queryTemplate, r.StartRange, r.EndRange, pageState)) - atomic.AddUint64(&totalErrors, 1) - } + if !prepareDeleteResult { + log.Printf("ERROR: page iteration failed: %s\n", err) + fmt.Fprintf(os.Stderr, "FAILED QUERY: %s\n", fmt.Sprintf("%s [from=%d][to=%d][pagestate=%x]", queryTemplate, r.StartRange, r.EndRange, pageState)) + atomic.AddUint64(&totalErrors, 1) } if len(nextPageState) == 0 { + // Checks for delete queries after iterating all pages + if len(info.Data) > 0 { + _, numErr := c.performDeleteQueries(&info, session, colSettings) + atomic.AddUint64(&totalErrors, numErr) + atomic.AddUint64(&totalDeletes, uint64(len(info.Data))) + if totalDeletes >= counter { + log.Printf("... deleted %d queries ...", counter) + counter += 1000 + } + // reset back to the deleted query template after finishing executing delete + info = deleteInfo{Query: deleteQueryTemplate} + } break } - pageState = nextPageState } - + marker.MarkProgress(r.StartRange, r.EndRange) atomic.AddUint64(&totalRows, rowsRetrieved) } + // after finishing deletion of one table, set to nil, because we continue to delete normally now + c.settings.RangesRead = nil } else { log.Printf("ERROR: %s\n", err) fmt.Fprintf(os.Stderr, "FAILED TO CREATE SESSION: %s\n", err) @@ -429,9 +651,7 @@ func (c *ClioCass) prepareDeleteQueries( } wg.Wait() - close(outChannel) - - return info, totalRows, totalErrors + return totalRows, totalDeletes, totalErrors } func (c *ClioCass) splitDeleteWork(info *deleteInfo) [][]deleteParams { @@ -461,7 +681,7 @@ func (c *ClioCass) splitDeleteWork(info *deleteInfo) [][]deleteParams { return chunks } -func (c *ClioCass) performDeleteQueries(info *deleteInfo, colSettings columnSettings) (uint64, uint64) { +func (c *ClioCass) performDeleteQueries(info *deleteInfo, session *gocql.Session, colSettings columnSettings) (uint64, uint64) { var wg sync.WaitGroup var sessionCreationWaitGroup sync.WaitGroup var totalDeletes uint64 @@ -485,43 +705,35 @@ func (c *ClioCass) performDeleteQueries(info *deleteInfo, colSettings columnSett go func(number int, q string, bc int) { defer wg.Done() - var session *gocql.Session - var err error - if session, err = c.clusterConfig.CreateSession(); err == nil { - defer session.Close() - - sessionCreationWaitGroup.Done() - sessionCreationWaitGroup.Wait() - preparedQuery := session.Query(q) - - for chunk := range chunksChannel { - for _, r := range chunk { - if bc == 2 { - preparedQuery.Bind(r.Blob, r.Seq) - } else if bc == 1 { - if colSettings.UseSeq { - preparedQuery.Bind(r.Seq) - } else if colSettings.UseBlob { - preparedQuery.Bind(r.Blob) - } + sessionCreationWaitGroup.Done() + sessionCreationWaitGroup.Wait() + preparedQuery := session.Query(q) + + for chunk := range chunksChannel { + for _, r := range chunk { + if bc == 3 { + preparedQuery.Bind(r.Blob, r.Seq, r.tnxIndex) + } else if bc == 2 { + preparedQuery.Bind(r.Blob, r.Seq) + } else if bc == 1 { + if colSettings.UseSeq { + preparedQuery.Bind(r.Seq) + } else if colSettings.UseBlob { + preparedQuery.Bind(r.Blob) } + } - if err := preparedQuery.Exec(); err != nil { - log.Printf("DELETE ERROR: %s\n", err) - fmt.Fprintf(os.Stderr, "FAILED QUERY: %s\n", fmt.Sprintf("%s [blob=0x%x][seq=%d]", info.Query, r.Blob, r.Seq)) - atomic.AddUint64(&totalErrors, 1) - } else { - atomic.AddUint64(&totalDeletes, 1) - if atomic.LoadUint64(&totalDeletes)%10000 == 0 { - log.Printf("... %d deletes ...\n", totalDeletes) - } + if err := preparedQuery.Exec(); err != nil { + log.Printf("DELETE ERROR: %s\n", err) + fmt.Fprintf(os.Stderr, "FAILED QUERY: %s\n", fmt.Sprintf("%s [blob=0x%x][seq=%d]", info.Query, r.Blob, r.Seq)) + atomic.AddUint64(&totalErrors, 1) + } else { + atomic.AddUint64(&totalDeletes, 1) + if atomic.LoadUint64(&totalDeletes)%10000 == 0 { + log.Printf("... %d deletes ...\n", totalDeletes) } } } - } else { - log.Printf("ERROR: %s\n", err) - fmt.Fprintf(os.Stderr, "FAILED TO CREATE SESSION: %s\n", err) - atomic.AddUint64(&totalErrors, 1) } }(i, query, bindCount) } diff --git a/tools/cassandra_delete_range/internal/util/util.go b/tools/cassandra_delete_range/internal/util/util.go index 9a2aa1a4f..868713d28 100644 --- a/tools/cassandra_delete_range/internal/util/util.go +++ b/tools/cassandra_delete_range/internal/util/util.go @@ -14,6 +14,12 @@ type TokenRange struct { EndRange int64 } +// not stored as arrays of startRange/endRange because it will be O(n) lookup +// stored as Map with key startRange, value endRange so O(1) lookup for tokenRange +type StoredRange struct { + TokenRange map[int64]int64 // all ranges that has been read and deleted +} + func Shuffle(data []*TokenRange) { for i := 1; i < len(data); i++ { r := rand.Intn(i + 1)