diff --git a/integration/integration_test.go b/integration/integration_test.go index ba9f4d60..5cebb7f5 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -146,15 +146,21 @@ func TestLiveLogIntegration(t *testing.T) { } // Step 3 - Validate checkpoint size increment. - checkpoint, _, _, err = client.FetchCheckpoint(ctx, logRead, noteVerifier, noteVerifier.Name()) - if err != nil { - t.Errorf("client.FetchCheckpoint: %v", err) - } - if checkpoint == nil { - t.Fatal("checkpoint not found") + var gotIncrease uint64 + for gotIncrease != uint64(*testEntrySize) { + checkpoint, _, _, err = client.FetchCheckpoint(ctx, logRead, noteVerifier, noteVerifier.Name()) + if err != nil { + t.Errorf("client.FetchCheckpoint: %v", err) + } + if checkpoint == nil { + t.Fatal("checkpoint not found") + } + t.Logf("checkpoint final size: %d", checkpoint.Size) + gotIncrease = checkpoint.Size - checkpointInitSize + + time.Sleep(100 * time.Millisecond) } - t.Logf("checkpoint final size: %d", checkpoint.Size) - gotIncrease := checkpoint.Size - checkpointInitSize + if gotIncrease != uint64(*testEntrySize) { t.Errorf("checkpoint size increase got: %d, want: %d", gotIncrease, *testEntrySize) } diff --git a/storage/mysql/mysql.go b/storage/mysql/mysql.go index 457cc668..36c2ef53 100644 --- a/storage/mysql/mysql.go +++ b/storage/mysql/mysql.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "strings" + "time" _ "github.com/go-sql-driver/mysql" "github.com/transparency-dev/merkle/rfc6962" @@ -32,16 +33,22 @@ import ( ) const ( - selectCheckpointByIDSQL = "SELECT `note` FROM `Checkpoint` WHERE `id` = ?" - selectCheckpointByIDForUpdateSQL = selectCheckpointByIDSQL + " FOR UPDATE" - replaceCheckpointSQL = "REPLACE INTO `Checkpoint` (`id`, `note`) VALUES (?, ?)" - selectSubtreeByLevelAndIndexSQL = "SELECT `nodes` FROM `Subtree` WHERE `level` = ? AND `index` = ?" - replaceSubtreeSQL = "REPLACE INTO `Subtree` (`level`, `index`, `nodes`) VALUES (?, ?, ?)" - selectTiledLeavesSQL = "SELECT `data` FROM `TiledLeaves` WHERE `tile_index` = ?" - replaceTiledLeavesSQL = "REPLACE INTO `TiledLeaves` (`tile_index`, `data`) VALUES (?, ?)" - + selectNextSeqIndexByIDSQL = "SELECT `next_sequence_index` FROM `SequencingMetadata` WHERE `id` = ?" + selectNextSeqIndexByIDForUpdateSQL = selectNextSeqIndexByIDSQL + " FOR UPDATE" + replaceNextSeqIndexSQL = "REPLACE INTO `SequencingMetadata` (`id`, `next_sequence_index`) VALUES (?, ?)" + selectCheckpointByIDSQL = "SELECT `note` FROM `Checkpoint` WHERE `id` = ?" + selectCheckpointByIDForUpdateSQL = selectCheckpointByIDSQL + " FOR UPDATE" + replaceCheckpointSQL = "REPLACE INTO `Checkpoint` (`id`, `note`) VALUES (?, ?)" + selectSubtreeByLevelAndIndexSQL = "SELECT `nodes` FROM `Subtree` WHERE `level` = ? AND `index` = ?" + replaceSubtreeSQL = "REPLACE INTO `Subtree` (`level`, `index`, `nodes`) VALUES (?, ?, ?)" + selectTiledLeavesSQL = "SELECT `data` FROM `TiledLeaves` WHERE `tile_index` = ?" + replaceTiledLeavesSQL = "REPLACE INTO `TiledLeaves` (`tile_index`, `data`) VALUES (?, ?)" + + nextSeqIndexID = 0 checkpointID = 0 entryBundleSize = 256 + + defaultIntegrationSizeLimit = 10 * entryBundleSize ) // Storage is a MySQL-based storage implementation for Tessera. @@ -70,7 +77,13 @@ func New(ctx context.Context, db *sql.DB, opts ...func(*tessera.StorageOptions)) return nil, errors.New("tessera.WithCheckpointSignerVerifier must be provided in New()") } - s.queue = storage.NewQueue(ctx, opt.BatchMaxAge, opt.BatchMaxSize, s.sequenceBatch) + s.queue = storage.NewQueue(ctx, opt.BatchMaxAge, opt.BatchMaxSize, s.sequenceEntries) + + // Initialize next sequence index if there is no row in the SequencingMetadata table. + if err := s.initNextSequenceIndex(ctx); err != nil { + klog.Errorf("Failed to initialize checkpoint: %v", err) + return nil, err + } // Initialize checkpoint if there is no row in the Checkpoint table. checkpoint, err := s.ReadCheckpoint(ctx) @@ -101,9 +114,75 @@ func New(ctx context.Context, db *sql.DB, opts ...func(*tessera.StorageOptions)) } } + // Run integration every 1 second. + go func() { + t := time.NewTicker(1 * time.Second) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + } + + func() { + cctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + if err := s.consumeEntries(cctx, defaultIntegrationSizeLimit); err != nil { + klog.Errorf("consumeEntries: %v", err) + } + }() + } + }() + return s, nil } +// initNextSequenceIndex initializes next sequence index if there is no row in the SequencingMetadata table. +func (s *Storage) initNextSequenceIndex(ctx context.Context) error { + row := s.db.QueryRowContext(ctx, selectNextSeqIndexByIDSQL, nextSeqIndexID) + if err := row.Err(); err != nil { + return err + } + var seqIndex uint64 + if err := row.Scan(&seqIndex); err != nil { + if err == sql.ErrNoRows { + klog.Infof("Initializing next sequence index") + // Get a Tx for making transaction requests. + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return err + } + // Defer a rollback in case anything fails. + defer func() { + if err := tx.Rollback(); err != nil && err != sql.ErrTxDone { + klog.Errorf("Failed to rollback in write initial next sequence index: %v", err) + } + }() + if err := s.writeNextSequenceIndex(ctx, tx, 0); err != nil { + klog.Errorf("Failed to write initial next sequence index: %v", err) + return err + } + // Commit the transaction. + return tx.Commit() + } + return err + } + + return nil +} + +// writeNextSequenceIndex stores the next sequence index. +func (s *Storage) writeNextSequenceIndex(ctx context.Context, tx *sql.Tx, nextSeqIndex uint64) error { + if _, err := tx.ExecContext(ctx, replaceNextSeqIndexSQL, nextSeqIndexID, nextSeqIndex); err != nil { + klog.Errorf("Failed to execute replaceNextSeqIndexSQL: %v", err) + return err + } + + return nil +} + // ReadCheckpoint returns the latest stored checkpoint. // If the checkpoint is not found, nil is returned with no error. func (s *Storage) ReadCheckpoint(ctx context.Context) ([]byte, error) { @@ -220,15 +299,14 @@ func (s *Storage) Add(ctx context.Context, entry *tessera.Entry) tessera.IndexFu return s.queue.Add(ctx, entry) } -// sequenceBatch writes the entries from the provided batch into the entry bundle files of the log. +// sequenceEntries writes the entries from the provided batch into the entry bundle files of the log, +// and durably assigns each of the passed-in entries an index in the log. // // This func starts filling entries bundles at the next available slot in the log, ensuring that the // sequenced entries are contiguous from the zeroth entry (i.e left-hand dense). // We try to minimise the number of partially complete entry bundles by writing entries in chunks rather // than one-by-one. -// -// TODO(#21): Separate sequencing and integration for better performance. -func (s *Storage) sequenceBatch(ctx context.Context, entries []*tessera.Entry) error { +func (s *Storage) sequenceEntries(ctx context.Context, entries []*tessera.Entry) error { // Return when there is no entry to sequence. if len(entries) == 0 { return nil @@ -242,36 +320,103 @@ func (s *Storage) sequenceBatch(ctx context.Context, entries []*tessera.Entry) e // Defer a rollback in case anything fails. defer func() { if err := tx.Rollback(); err != nil && err != sql.ErrTxDone { - klog.Errorf("Failed to rollback in sequenceBatch: %v", err) + klog.Errorf("Failed to rollback in sequenceEntries: %v", err) } }() - // Get tree size from checkpoint. Note that "SELECT ... FOR UPDATE" is used for row-level locking. - // TODO(#21): Optimize how we get the tree size without parsing and verifying the checkpoints every time. - row := tx.QueryRowContext(ctx, selectCheckpointByIDForUpdateSQL, checkpointID) + // Get next sequenced index. Note that "SELECT ... FOR UPDATE" is used for row-level locking. + row := tx.QueryRowContext(ctx, selectNextSeqIndexByIDForUpdateSQL, nextSeqIndexID) if err := row.Err(); err != nil { return err } - var rawCheckpoint []byte - if err := row.Scan(&rawCheckpoint); err != nil { - return fmt.Errorf("failed to read checkpoint: %w", err) + var nextSeqIndex uint64 + if err := row.Scan(&nextSeqIndex); err != nil { + return fmt.Errorf("failed to read next sequence index: %w", err) } - checkpoint, err := s.parseCheckpoint(rawCheckpoint) - if err != nil { - return fmt.Errorf("failed to verify checkpoint: %w", err) + + sequencedEntries := make([]storage.SequencedEntry, len(entries)) + // Assign provisional sequence numbers to entries. + // We need to do this here in order to support serialisations which include the log position. + for i, e := range entries { + sequencedEntries[i] = storage.SequencedEntry{ + BundleData: e.MarshalBundleData(nextSeqIndex + uint64(i)), + LeafHash: e.LeafHash(), + } + } + + // Add sequenced entries to entry bundles. + bundleIndex, entriesInBundle := nextSeqIndex/entryBundleSize, nextSeqIndex%entryBundleSize + bundleWriter := &bytes.Buffer{} + + // If the latest bundle is partial, we need to read the data it contains in for our newer, larger, bundle. + if entriesInBundle > 0 { + row := tx.QueryRowContext(ctx, selectTiledLeavesSQL, bundleIndex) + if err := row.Err(); err != nil { + return err + } + + var partialEntryBundle []byte + if err := row.Scan(&partialEntryBundle); err != nil { + return fmt.Errorf("row.Scan: %w", err) + } + + if _, err := bundleWriter.Write(partialEntryBundle); err != nil { + return fmt.Errorf("bundleWriter: %w", err) + } + } + + // Add new entries to the bundle. + for _, e := range sequencedEntries { + if _, err := bundleWriter.Write(e.BundleData); err != nil { + return fmt.Errorf("bundleWriter.Write: %w", err) + } + entriesInBundle++ + + // This bundle is full, so we need to write it out. + if entriesInBundle == entryBundleSize { + if err := s.writeEntryBundle(ctx, tx, bundleIndex, bundleWriter.Bytes()); err != nil { + return fmt.Errorf("writeEntryBundle: %w", err) + } + + // Prepare the next entry bundle for any remaining entries in the batch. + bundleIndex++ + entriesInBundle = 0 + bundleWriter = &bytes.Buffer{} + } + } + + // If we have a partial bundle remaining once we've added all the entries from the batch, + // this needs writing out too. + if entriesInBundle > 0 { + if err := s.writeEntryBundle(ctx, tx, bundleIndex, bundleWriter.Bytes()); err != nil { + return fmt.Errorf("writeEntryBundle: %w", err) + } } - // Integrate the new entries into the entry bundle (TiledLeaves table) and tile (Subtree table). - if err := s.integrate(ctx, tx, checkpoint.Size, entries); err != nil { - return fmt.Errorf("failed to integrate: %w", err) + // Update last sequenced entry index. + if err := s.writeNextSequenceIndex(ctx, tx, nextSeqIndex+uint64(len(entries))); err != nil { + return fmt.Errorf("writeNextSequenceIndex: %w", err) } // Commit the transaction. return tx.Commit() } -// integrate incorporates the provided entries into the log starting at fromSeq. -func (s *Storage) integrate(ctx context.Context, tx *sql.Tx, fromSeq uint64, entries []*tessera.Entry) error { +// consumeEntries fetches the sequenced entries, integrate them into the log, and issues a new checkpoint. +func (s *Storage) consumeEntries(ctx context.Context, limit uint64) error { + klog.Info("consumeEntries start") + // Get a Tx for making transaction requests. + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return err + } + // Defer a rollback in case anything fails. + defer func() { + if err := tx.Rollback(); err != nil && err != sql.ErrTxDone { + klog.Errorf("Failed to rollback in consumeEntries: %v", err) + } + }() + tb := storage.NewTreeBuilder(func(ctx context.Context, tileIDs []storage.TileID, treeSize uint64) ([]*api.HashTile, error) { hashTiles := make([]*api.HashTile, len(tileIDs)) if len(tileIDs) == 0 { @@ -322,66 +467,85 @@ func (s *Storage) integrate(ctx context.Context, tx *sql.Tx, fromSeq uint64, ent return hashTiles, nil }) - sequencedEntries := make([]storage.SequencedEntry, len(entries)) - // Assign provisional sequence numbers to entries. - // We need to do this here in order to support serialisations which include the log position. - for i, e := range entries { - sequencedEntries[i] = storage.SequencedEntry{ - BundleData: e.MarshalBundleData(fromSeq + uint64(i)), - LeafHash: e.LeafHash(), - } + // Get the next sequence index without transaction. + row := s.db.QueryRowContext(ctx, selectNextSeqIndexByIDSQL, nextSeqIndexID) + if err := row.Err(); err != nil { + return err + } + var nextSeqIndex uint64 + if err := row.Scan(&nextSeqIndex); err != nil { + return fmt.Errorf("failed to read next sequence index: %w", err) } - // Add sequenced entries to entry bundles. - bundleIndex, entriesInBundle := fromSeq/entryBundleSize, fromSeq%entryBundleSize - bundleWriter := &bytes.Buffer{} + klog.Infof("consumeEntries nextSeqIndex: %d", nextSeqIndex) - // If the latest bundle is partial, we need to read the data it contains in for our newer, larger, bundle. - if entriesInBundle > 0 { - row := tx.QueryRowContext(ctx, selectTiledLeavesSQL, bundleIndex) - if err := row.Err(); err != nil { - return err - } + // Get tree size from checkpoint. Note that "SELECT ... FOR UPDATE" is used for row-level locking. + // TODO(#21): Optimize how we get the tree size without parsing and verifying the checkpoints every time. + row = tx.QueryRowContext(ctx, selectCheckpointByIDForUpdateSQL, checkpointID) + if err := row.Err(); err != nil { + return err + } + var rawCheckpoint []byte + if err := row.Scan(&rawCheckpoint); err != nil { + return fmt.Errorf("failed to read checkpoint: %w", err) + } + checkpoint, err := s.parseCheckpoint(rawCheckpoint) + if err != nil { + return fmt.Errorf("failed to verify checkpoint: %w", err) + } - var partialEntryBundle []byte - if err := row.Scan(&partialEntryBundle); err != nil { - return fmt.Errorf("row.Scan: %w", err) - } + klog.Infof("consumeEntries checkpoint.Size: %d", checkpoint.Size) - if _, err := bundleWriter.Write(partialEntryBundle); err != nil { - return fmt.Errorf("bundleWriter: %w", err) - } + integrateEntriesSize := nextSeqIndex - checkpoint.Size + + // Return when there is no entry to integrate. + // Ignore when next sequence index is smaller than the checkpoint size due to dirty read. + if integrateEntriesSize <= 0 { + return nil } - // Add new entries to the bundle. - for _, e := range sequencedEntries { - if _, err := bundleWriter.Write(e.BundleData); err != nil { - return fmt.Errorf("bundleWriter.Write: %w", err) + // Fetch the sequenced entries that are not yet integrated. + sequencedEntries := []storage.SequencedEntry{} + + for integrateEntriesSize > 0 { + entryBundleIndex := nextSeqIndex / entryBundleSize + row := tx.QueryRowContext(ctx, selectTiledLeavesSQL, entryBundleIndex) + if err := row.Err(); err != nil { + return err } - entriesInBundle++ - // This bundle is full, so we need to write it out. - if entriesInBundle == entryBundleSize { - if err := s.writeEntryBundle(ctx, tx, bundleIndex, bundleWriter.Bytes()); err != nil { - return fmt.Errorf("writeEntryBundle: %w", err) + var entryBundle []byte + if err := row.Scan(&entryBundle); err != nil { + if err == sql.ErrNoRows { + return nil } + return err + } - // Prepare the next entry bundle for any remaining entries in the batch. - bundleIndex++ - entriesInBundle = 0 - bundleWriter = &bytes.Buffer{} + bundle := api.EntryBundle{} + if err := bundle.UnmarshalText(entryBundle); err != nil { + return fmt.Errorf("failed to parse EntryBundle at index %d: %w", entryBundleIndex, err) } - } + for i, data := range bundle.Entries { + if integrateEntriesSize == 0 { + break + } + if entryBundleIndex*entryBundleSize+uint64(i) < checkpoint.Size { + continue + } - // If we have a partial bundle remaining once we've added all the entries from the batch, - // this needs writing out too. - if entriesInBundle > 0 { - if err := s.writeEntryBundle(ctx, tx, bundleIndex, bundleWriter.Bytes()); err != nil { - return fmt.Errorf("writeEntryBundle: %w", err) + sequencedEntries = append(sequencedEntries, storage.SequencedEntry{ + BundleData: entryBundle, + LeafHash: rfc6962.DefaultHasher.HashLeaf(data), + }) + integrateEntriesSize-- } + + klog.Infof("consumeEntries integrateEntriesSize: %d", integrateEntriesSize) } - newSize, newRoot, tiles, err := tb.Integrate(ctx, fromSeq, sequencedEntries) + // Integrate sequenced entries into the log. + newSize, newRoot, tiles, err := tb.Integrate(ctx, checkpoint.Size, sequencedEntries) if err != nil { return fmt.Errorf("tb.Integrate: %v", err) } @@ -400,5 +564,7 @@ func (s *Storage) integrate(ctx context.Context, tx *sql.Tx, fromSeq uint64, ent if err := s.writeCheckpoint(ctx, tx, newSize, newRoot); err != nil { return fmt.Errorf("writeCheckpoint: %w", err) } - return nil + + // Commit the transaction. + return tx.Commit() } diff --git a/storage/mysql/mysql_test.go b/storage/mysql/mysql_test.go index a52ce212..216d8726 100644 --- a/storage/mysql/mysql_test.go +++ b/storage/mysql/mysql_test.go @@ -28,6 +28,7 @@ import ( "testing" "time" + "github.com/transparency-dev/formats/log" "github.com/transparency-dev/merkle/rfc6962" tessera "github.com/transparency-dev/trillian-tessera" "github.com/transparency-dev/trillian-tessera/api" @@ -102,7 +103,7 @@ func TestMain(m *testing.M) { // `multiStatements=true` in the data source name allows multiple statements in one query. // This is not being used in the actual MySQL storage implementation. func initDatabaseSchema(ctx context.Context) { - dropTablesSQL := "DROP TABLE IF EXISTS `Checkpoint`, `Subtree`, `TiledLeaves`" + dropTablesSQL := "DROP TABLE IF EXISTS `SequencingMetadata`, `Checkpoint`, `Subtree`, `TiledLeaves`" rawSchema, err := os.ReadFile("schema.sql") if err != nil { @@ -280,11 +281,27 @@ func TestTileRoundTrip(t *testing.T) { }, } { t.Run(test.name, func(t *testing.T) { + cpSize := uint64(0) + entryIndex, err := s.Add(ctx, tessera.NewEntry(test.entry))() if err != nil { t.Errorf("Add got err: %v", err) } + for cpSize <= entryIndex { + time.Sleep(100 * time.Millisecond) + + cpRaw, err := s.ReadCheckpoint(ctx) + if err != nil { + t.Errorf("ReadCheckpoint got err: %v", err) + } + cp, _, _, err := log.ParseCheckpoint(cpRaw, noteVerifier.Name(), noteVerifier) + if err != nil { + t.Errorf("log.ParseCheckpoint got err: %v", err) + } + cpSize = cp.Size + } + tileLevel, tileIndex, _, nodeIndex := layout.NodeCoordsToTileAddress(0, entryIndex) tileRaw, err := s.ReadTile(ctx, tileLevel, tileIndex, nodeIndex) if err != nil { diff --git a/storage/mysql/schema.sql b/storage/mysql/schema.sql index 485c4fb4..e07bb604 100644 --- a/storage/mysql/schema.sql +++ b/storage/mysql/schema.sql @@ -14,7 +14,16 @@ -- MySQL version of the Trillian Tessera database schema. --- "Checkpoint" table stores a single row that records the current state of the log. It is updated after every sequence and integration. +-- "SequencingMetadata" table stores the next sequence index. It is updated after every sequencing. +CREATE TABLE IF NOT EXISTS `SequencingMetadata` ( + -- id is expected to be always 0 to maintain a maximum of a single row. + `id` INT UNSIGNED NOT NULL, + -- next_sequence_index is the index of the next to-be-sequenced entry. + `next_sequence_index` BIGINT UNSIGNED NOT NULL, + PRIMARY KEY(`id`) +); + +-- "Checkpoint" table stores a single row that records the current state of the log. It is updated after every integration. CREATE TABLE IF NOT EXISTS `Checkpoint` ( -- id is expected to be always 0 to maintain a maximum of a single row. `id` INT UNSIGNED NOT NULL,