diff --git a/cmd/data-node/commands/start/node_pre.go b/cmd/data-node/commands/start/node_pre.go index 3b0b8b2a45..dfb7947ef8 100644 --- a/cmd/data-node/commands/start/node_pre.go +++ b/cmd/data-node/commands/start/node_pre.go @@ -375,7 +375,8 @@ func (l *NodeCommand) initialiseNetworkHistory(preLog *logging.Logger, connConfi l.snapshotService, networkHistoryStore, l.conf.API.Port, - l.vegaPaths.StatePathFor(paths.DataNodeNetworkHistorySnapshotCopyTo)) + l.vegaPaths.StatePathFor(paths.DataNodeNetworkHistorySnapshotCopyTo), + ) if err != nil { return fmt.Errorf("failed to create networkHistory service:%w", err) } diff --git a/datanode/networkhistory/service.go b/datanode/networkhistory/service.go index b60f77ef4b..58a78c1f42 100644 --- a/datanode/networkhistory/service.go +++ b/datanode/networkhistory/service.go @@ -70,8 +70,26 @@ func New(ctx context.Context, log *logging.Logger, chainID string, cfg Config, c datanodeGrpcAPIPort: datanodeGrpcAPIPort, } + go func() { + ticker := time.NewTicker(5 * time.Second) + for { + select { + case <-ctx.Done(): + s.log.Info("saving network history before before leaving") + // consume all pending files + s.snapshotService.Flush() + s.log.Info("network history saved (maybe)") + return + case <-ticker.C: + s.snapshotService.Flush() + } + } + }() + if cfg.Publish { var err error + + // publish all file which are ready go func() { ticker := time.NewTicker(5 * time.Second) for { @@ -182,6 +200,11 @@ func (d *Service) CreateAndPublishSegment(ctx context.Context, chainID string, t } } + // empty the file worker + d.log.Info("saving network history to disk") + d.snapshotService.Flush() + d.log.Info("network history saved to disk") + if err = d.PublishSegments(ctx); err != nil { return fmt.Errorf("failed to publish snapshots: %w", err) } diff --git a/datanode/networkhistory/service_test.go b/datanode/networkhistory/service_test.go index 2cd98f02c8..5806d5648c 100644 --- a/datanode/networkhistory/service_test.go +++ b/datanode/networkhistory/service_test.go @@ -148,7 +148,7 @@ func TestMain(t *testing.M) { pgLog *bytes.Buffer, ) { sqlConfig = config - log.Infof("DB Connection String: ", sqlConfig.ConnectionConfig.GetConnectionString()) + log.Infof("DB Connection String: %v", sqlConfig.ConnectionConfig.GetConnectionString()) pool, err := sqlstore.CreateConnectionPool(outerCtx, sqlConfig.ConnectionConfig) if err != nil { @@ -184,7 +184,7 @@ func TestMain(t *testing.M) { panic(fmt.Errorf("failed to create snapshot: %w", err)) } - waitForSnapshotToComplete(ss) + waitForSnapshotToComplete2(ss, snapshotService.Flush) snapshots = append(snapshots, ss) @@ -211,7 +211,7 @@ func TestMain(t *testing.M) { panic(fmt.Errorf("failed to create snapshot:%w", err)) } - waitForSnapshotToComplete(lastSnapshot) + waitForSnapshotToComplete2(lastSnapshot, snapshotService.Flush) snapshots = append(snapshots, lastSnapshot) md5Hash, err := Md5Hash(lastSnapshot.UnpublishedSnapshotDataDirectory()) if err != nil { @@ -268,7 +268,7 @@ func TestMain(t *testing.M) { panic(fmt.Errorf("failed to create snapshot:%w", err)) } - waitForSnapshotToComplete(lastSnapshot) + waitForSnapshotToComplete2(lastSnapshot, snapshotService.Flush) snapshots = append(snapshots, lastSnapshot) md5Hash, err := Md5Hash(lastSnapshot.UnpublishedSnapshotDataDirectory()) if err != nil { @@ -421,6 +421,7 @@ func TestLoadingDataFetchedAsynchronously(t *testing.T) { require.Equal(t, int64(1000), fetched) networkhistoryService := setupNetworkHistoryService(ctx, log, snapshotService, networkHistoryStore, snapshotCopyToPath) + segments, err := networkhistoryService.ListAllHistorySegments() require.NoError(t, err) @@ -583,7 +584,7 @@ func TestRestoringNodeThatAlreadyContainsData(t *testing.T) { ss, err := service.CreateSnapshotAsynchronously(ctx, chainId, lastCommittedBlockHeight) require.NoError(t, err) - waitForSnapshotToComplete(ss) + waitForSnapshotToComplete2(ss, snapshotService.Flush) md5Hash, err = Md5Hash(ss.UnpublishedSnapshotDataDirectory()) require.NoError(t, err) @@ -888,7 +889,7 @@ func TestRestoreFromPartialHistoryAndProcessEvents(t *testing.T) { if lastCommittedBlockHeight > 0 && lastCommittedBlockHeight%snapshotInterval == 0 { ss, err = service.CreateSnapshotAsynchronously(ctx, chainId, lastCommittedBlockHeight) require.NoError(t, err) - waitForSnapshotToComplete(ss) + waitForSnapshotToComplete2(ss, service.Flush) if lastCommittedBlockHeight == 4000 { newSnapshotFileHashAt4000, err = Md5Hash(ss.UnpublishedSnapshotDataDirectory()) @@ -993,7 +994,7 @@ func TestRestoreFromFullHistorySnapshotAndProcessEvents(t *testing.T) { if lastCommittedBlockHeight == 3000 { ss, err := service.CreateSnapshotAsynchronously(ctx, chainId, lastCommittedBlockHeight) require.NoError(t, err) - waitForSnapshotToComplete(ss) + waitForSnapshotToComplete2(ss, service.Flush) snapshotFileHashAfterReloadAt2000AndEventReplayTo3000, err = Md5Hash(ss.UnpublishedSnapshotDataDirectory()) require.NoError(t, err) @@ -1096,7 +1097,7 @@ func TestRestoreFromFullHistorySnapshotWithIndexesAndOrderTriggersAndProcessEven if lastCommittedBlockHeight == 3000 { ss, err := service.CreateSnapshotAsynchronously(ctx, chainId, lastCommittedBlockHeight) require.NoError(t, err) - waitForSnapshotToComplete(ss) + waitForSnapshotToComplete2(ss, service.Flush) snapshotFileHashAfterReloadAt2000AndEventReplayTo3000, err = Md5Hash(ss.UnpublishedSnapshotDataDirectory()) require.NoError(t, err) @@ -1578,6 +1579,36 @@ func waitForSnapshotToComplete(sf segment.Unpublished) { } } +func waitForSnapshotToComplete2(sf segment.Unpublished, flush func()) { + for { + time.Sleep(10 * time.Millisecond) + // wait for snapshot current file + _, err := os.Stat(sf.UnpublishedSnapshotDataDirectory()) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + continue + } else { + panic(err) + } + } + + flush() + + // wait for snapshot data dump in progress file to be removed + + _, err = os.Stat(sf.InProgressFilePath()) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + break + } else { + panic(err) + } + } else { + continue + } + } +} + func decompressEventFile() { sourceFile, err := os.Open(compressedEventsFile) if err != nil { diff --git a/datanode/networkhistory/snapshot/file_worker.go b/datanode/networkhistory/snapshot/file_worker.go new file mode 100644 index 0000000000..df0ce8b947 --- /dev/null +++ b/datanode/networkhistory/snapshot/file_worker.go @@ -0,0 +1,107 @@ +// Copyright (C) 2023 Gobalsky Labs Limited +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package snapshot + +import ( + "bytes" + "fmt" + "os" + "sync" +) + +type bufAndPath struct { + buf *bytes.Buffer + isProgressFile bool + path string +} + +type FileWorker struct { + mu sync.Mutex + queue []*bufAndPath +} + +func NewFileWorker() *FileWorker { + return &FileWorker{ + queue: []*bufAndPath{}, + } +} + +func (fw *FileWorker) Add(buf *bytes.Buffer, path string) { + fw.mu.Lock() + defer fw.mu.Unlock() + + fw.queue = append(fw.queue, &bufAndPath{buf, false, path}) +} + +func (fw *FileWorker) AddLockFile(path string) { + fw.mu.Lock() + defer fw.mu.Unlock() + + fw.queue = append(fw.queue, &bufAndPath{nil, true, path}) +} + +func (fw *FileWorker) peek() (bp *bufAndPath) { + fw.mu.Lock() + defer fw.mu.Unlock() + + if len(fw.queue) <= 0 { + return + } + + bp, fw.queue = fw.queue[0], fw.queue[1:] + + return +} + +func (fw *FileWorker) Empty() bool { + fw.mu.Lock() + defer fw.mu.Unlock() + + return len(fw.queue) <= 0 +} + +func (fw *FileWorker) Consume() error { + bp := fw.peek() + if bp == nil { + return nil // nothing to do + } + + if bp.isProgressFile { + return fw.removeLockFile(bp.path) + } + + return fw.writeSegment(bp) +} + +func (fw *FileWorker) removeLockFile(path string) error { + return os.Remove(path) +} + +func (fw *FileWorker) writeSegment(bp *bufAndPath) error { + file, err := os.Create(bp.path) + if err != nil { + return fmt.Errorf("failed to create file %s : %w", bp.path, err) + } + + defer file.Close() + + _, err = bp.buf.WriteTo(file) + if err != nil { + return fmt.Errorf("couldn't writer to file %v : %w", bp.path, err) + } + + return nil +} diff --git a/datanode/networkhistory/snapshot/service.go b/datanode/networkhistory/snapshot/service.go index f81ae8f8bb..bfebd2d068 100644 --- a/datanode/networkhistory/snapshot/service.go +++ b/datanode/networkhistory/snapshot/service.go @@ -40,6 +40,9 @@ type Service struct { historyStore HistoryStore + fw *FileWorker + tableSnapshotFileSizesCached map[string]int + createSnapshotLock mutex.CtxMutex copyToPath string migrateSchemaUpToVersion func(version int64) error @@ -59,14 +62,16 @@ func NewSnapshotService(log *logging.Logger, config Config, connPool *pgxpool.Po } s := &Service{ - log: log, - config: config, - connPool: connPool, - createSnapshotLock: mutex.New(), - copyToPath: snapshotsCopyToPath, - migrateSchemaUpToVersion: migrateDatabaseToVersion, - migrateSchemaDownToVersion: migrateSchemaDownToVersion, - historyStore: historyStore, + log: log, + config: config, + connPool: connPool, + createSnapshotLock: mutex.New(), + copyToPath: snapshotsCopyToPath, + migrateSchemaUpToVersion: migrateDatabaseToVersion, + migrateSchemaDownToVersion: migrateSchemaDownToVersion, + historyStore: historyStore, + fw: NewFileWorker(), + tableSnapshotFileSizesCached: map[string]int{}, } err = os.MkdirAll(s.copyToPath, fs.ModePerm) @@ -77,6 +82,14 @@ func NewSnapshotService(log *logging.Logger, config Config, connPool *pgxpool.Po return s, nil } +func (b *Service) Flush() { + for !b.fw.Empty() { + if err := b.fw.Consume(); err != nil { + b.log.Error("failed to write all files to disk", logging.Error(err)) + } + } +} + func (b *Service) SnapshotData(ctx context.Context, chainID string, toHeight int64) error { _, err := b.CreateSnapshotAsynchronously(ctx, chainID, toHeight) if err != nil { diff --git a/datanode/networkhistory/snapshot/service_create_snapshot.go b/datanode/networkhistory/snapshot/service_create_snapshot.go index b4d46bb300..c4e6939929 100644 --- a/datanode/networkhistory/snapshot/service_create_snapshot.go +++ b/datanode/networkhistory/snapshot/service_create_snapshot.go @@ -16,6 +16,7 @@ package snapshot import ( + "bytes" "context" "errors" "fmt" @@ -29,7 +30,6 @@ import ( "code.vegaprotocol.io/vega/datanode/networkhistory/segment" "code.vegaprotocol.io/vega/datanode/sqlstore" "code.vegaprotocol.io/vega/libs/fs" - vio "code.vegaprotocol.io/vega/libs/io" "code.vegaprotocol.io/vega/logging" "github.com/georgysavva/scany/pgxscan" @@ -50,7 +50,10 @@ func (b *Service) CreateSnapshotAsynchronously(ctx context.Context, chainID stri return b.createNewSnapshot(ctx, chainID, toHeight, true) } -func (b *Service) createNewSnapshot(ctx context.Context, chainID string, toHeight int64, +func (b *Service) createNewSnapshot( + ctx context.Context, + chainID string, + toHeight int64, async bool, ) (segment.Unpublished, error) { var err error @@ -115,11 +118,12 @@ func (b *Service) createNewSnapshot(ctx context.Context, chainID string, toHeigh runAllInReverseOrder(cleanUp) return segment.Unpublished{}, fmt.Errorf("failed to create write lock file:%w", err) } - cleanUp = append(cleanUp, func() { _ = os.Remove(s.InProgressFilePath()) }) + // cleanUp = append(cleanUp, func() { _ = os.Remove(s.InProgressFilePath()) }) // To ensure reads are isolated from this point forward execute a read on last block _, err = sqlstore.GetLastBlockUsingConnection(ctx, copyDataTx) if err != nil { + _ = os.Remove(s.InProgressFilePath()) runAllInReverseOrder(cleanUp) return segment.Unpublished{}, fmt.Errorf("failed to get last block using connection: %w", err) } @@ -130,6 +134,8 @@ func (b *Service) createNewSnapshot(ctx context.Context, chainID string, toHeigh if err != nil { b.log.Panic("failed to snapshot data", logging.Error(err)) } + + b.fw.AddLockFile(s.InProgressFilePath()) } if async { @@ -241,14 +247,14 @@ func (b *Service) snapshotData(ctx context.Context, tx pgx.Tx, dbMetaData Databa // Write Current State currentSQL := currentStateCopySQL(dbMetaData) - currentRowsCopied, currentStateBytesCopied, err := copyTablesData(ctx, tx, currentSQL, currentStateDir) + currentRowsCopied, currentStateBytesCopied, err := copyTablesData(ctx, tx, currentSQL, currentStateDir, b.fw, b.tableSnapshotFileSizesCached) if err != nil { return fmt.Errorf("failed to copy current state table data:%w", err) } // Write History historySQL := historyCopySQL(dbMetaData, seg) - historyRowsCopied, historyBytesCopied, err := copyTablesData(ctx, tx, historySQL, historyStateDir) + historyRowsCopied, historyBytesCopied, err := copyTablesData(ctx, tx, historySQL, historyStateDir, b.fw, b.tableSnapshotFileSizesCached) if err != nil { return fmt.Errorf("failed to copy history table data:%w", err) } @@ -310,15 +316,25 @@ func historyCopySQL(dbMetaData DatabaseMetadata, segment interface{ GetFromHeigh return copySQL } -func copyTablesData(ctx context.Context, tx pgx.Tx, copySQL []TableCopySql, toDir string) (int64, int64, error) { +func copyTablesData( + ctx context.Context, + tx pgx.Tx, + copySQL []TableCopySql, + toDir string, + fw *FileWorker, + lenCache map[string]int, +) (int64, int64, error) { var totalRowsCopied int64 var totalBytesCopied int64 + for _, tableSql := range copySQL { filePath := path.Join(toDir, tableSql.metaData.Name) - numRowsCopied, bytesCopied, err := writeTableToDataFile(ctx, tx, filePath, tableSql) + // numRowsCopied, bytesCopied, err := writeTableToDataFile(ctx, tx, filePath, tableSql) + numRowsCopied, bytesCopied, err := extractTableData(ctx, tx, filePath, tableSql, fw, lenCache) if err != nil { return 0, 0, fmt.Errorf("failed to write table %s to file %s:%w", tableSql.metaData.Name, filePath, err) } + totalRowsCopied += numRowsCopied totalBytesCopied += bytesCopied } @@ -326,20 +342,45 @@ func copyTablesData(ctx context.Context, tx pgx.Tx, copySQL []TableCopySql, toDi return totalRowsCopied, totalBytesCopied, nil } -func writeTableToDataFile(ctx context.Context, tx pgx.Tx, filePath string, tableSql TableCopySql) (int64, int64, error) { - file, err := os.Create(filePath) - if err != nil { - return 0, 0, fmt.Errorf("failed to create file %s:%w", filePath, err) +func extractTableData( + ctx context.Context, + tx pgx.Tx, + filePath string, + tableSql TableCopySql, + fw *FileWorker, + lenCache map[string]int, +) (int64, int64, error) { + allocCap := lenCache[tableSql.metaData.Name] + + fmt.Printf("DEBUGTEMP: %v - initialAllocCap(%v)\n", tableSql.metaData.Name, allocCap) + + if allocCap == 0 { + allocCap = 1000000 // roughly 1mb, because why not. + } else { + // if we already have something cached maybe this is growing + // a grow of 30% is not unreasonnable? + // should leave us some room + allocCap += allocCap / 3 } - defer file.Close() - fileWriter := vio.NewCountWriter(file) + fmt.Printf("DEBUGTEMP: %v - finalAllocCap(%v)", tableSql.metaData.Name, allocCap) + + b := bytes.NewBuffer(make([]byte, 0, allocCap)) - numRowsCopied, err := executeCopy(ctx, tx, tableSql, fileWriter) + numRowsCopied, err := executeCopy(ctx, tx, tableSql, b) if err != nil { return 0, 0, fmt.Errorf("failed to execute copy: %w", err) } - return numRowsCopied, fileWriter.Count(), nil + + len := int64(b.Len()) + // schedule it + fw.Add(b, filePath) + + // save the new len for this table + lenCache[tableSql.metaData.Name] = int(len) + fmt.Printf("DEBUGTEMP: %v - actualLen(%v)\n", tableSql.metaData.Name, len) + + return numRowsCopied, len, nil } func executeCopy(ctx context.Context, tx pgx.Tx, tableSql TableCopySql, w io.Writer) (int64, error) { diff --git a/datanode/networkhistory/snapshot/service_create_snapshot_test.go b/datanode/networkhistory/snapshot/service_create_snapshot_test.go index a887102467..6e43b1a68f 100644 --- a/datanode/networkhistory/snapshot/service_create_snapshot_test.go +++ b/datanode/networkhistory/snapshot/service_create_snapshot_test.go @@ -28,6 +28,7 @@ import ( func TestGetHistorySnapshots(t *testing.T) { snapshotsDir := t.TempDir() + service, err := snapshot.NewSnapshotService(logging.NewTestLogger(), snapshot.NewDefaultConfig(), nil, nil, snapshotsDir, nil, nil) if err != nil { panic(err)