From 3919fae2acde2cebdf332cc73e214f38bee67f5e Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Fri, 28 Jan 2022 15:05:21 -0700 Subject: [PATCH 01/16] Use sqlite3_file_control(SQLITE_FCNTL_PERSIST_WAL) to persist WAL Previously, Litestream would avoid closing the SQLite3 connection in order to ensure that the WAL file was not cleaned up by the database if it was the last connection. This commit changes the behavior by introducing a file control call to perform the same action. This allows us to close the database file normally in all cases. --- cmd/litestream/replicate.go | 2 +- db.go | 28 ++++++++-------------------- db_test.go | 4 ++-- litestream.go | 21 +++++++++++++++++++++ 4 files changed, 32 insertions(+), 23 deletions(-) diff --git a/cmd/litestream/replicate.go b/cmd/litestream/replicate.go index 7c0403bf..19731838 100644 --- a/cmd/litestream/replicate.go +++ b/cmd/litestream/replicate.go @@ -178,7 +178,7 @@ func (c *ReplicateCommand) Run(ctx context.Context) (err error) { // Close closes all open databases. func (c *ReplicateCommand) Close() (err error) { for _, db := range c.DBs { - if e := db.SoftClose(); e != nil { + if e := db.Close(); e != nil { log.Printf("error closing db: path=%s err=%s", db.Path(), e) if err == nil { err = e diff --git a/db.go b/db.go index dd33d7e5..b7a31eb1 100644 --- a/db.go +++ b/db.go @@ -291,20 +291,9 @@ func (db *DB) Open() (err error) { return nil } -// Close releases the read lock & closes the database. This method should only -// be called by tests as it causes the underlying database to be checkpointed. +// Close flushes outstanding WAL writes to replicas, releases the read lock, +// and closes the database. func (db *DB) Close() (err error) { - return db.close(false) -} - -// SoftClose closes everything but the underlying db connection. This method -// is available because the binary needs to avoid closing the database on exit -// to prevent autocheckpointing. -func (db *DB) SoftClose() (err error) { - return db.close(true) -} - -func (db *DB) close(soft bool) (err error) { db.cancel() db.wg.Wait() @@ -325,7 +314,7 @@ func (db *DB) close(soft bool) (err error) { err = e } } - r.Stop(!soft) + r.Stop(true) } // Release the read lock to allow other applications to handle checkpointing. @@ -335,9 +324,7 @@ func (db *DB) close(soft bool) (err error) { } } - // Only perform full close if this is not a soft close. - // This closes the underlying database connection which can clean up the WAL. - if !soft && db.db != nil { + if db.db != nil { if e := db.db.Close(); e != nil && err == nil { err = e } @@ -392,8 +379,9 @@ func (db *DB) init() (err error) { dsn := db.path dsn += fmt.Sprintf("?_busy_timeout=%d", BusyTimeout.Milliseconds()) - // Connect to SQLite database. - if db.db, err = sql.Open("sqlite3", dsn); err != nil { + // Connect to SQLite database. Use the driver registered with a hook to + // prevent WAL files from being removed. + if db.db, err = sql.Open("litestream-sqlite3", dsn); err != nil { return err } @@ -1419,7 +1407,7 @@ func applyWAL(ctx context.Context, index int, dbPath string) error { } // Open SQLite database and force a truncating checkpoint. - d, err := sql.Open("sqlite3", dbPath) + d, err := sql.Open("litestream-sqlite3", dbPath) if err != nil { return err } diff --git a/db_test.go b/db_test.go index b7eb54b0..ef82987a 100644 --- a/db_test.go +++ b/db_test.go @@ -270,8 +270,8 @@ func TestDB_Sync(t *testing.T) { t.Fatal(err) } - // Verify WAL does not exist. - if _, err := os.Stat(db.WALPath()); !os.IsNotExist(err) { + // Remove WAL file. + if err := os.Remove(db.WALPath()); err != nil { t.Fatal(err) } diff --git a/litestream.go b/litestream.go index c895ac39..3fa34742 100644 --- a/litestream.go +++ b/litestream.go @@ -13,6 +13,8 @@ import ( "strconv" "strings" "time" + + "github.com/mattn/go-sqlite3" ) // Naming constants. @@ -42,6 +44,25 @@ var ( ErrChecksumMismatch = errors.New("invalid replica, checksum mismatch") ) +var ( + // LogWriter is the destination writer for all logging. + LogWriter = os.Stdout + + // LogFlags are the flags passed to log.New(). + LogFlags = 0 +) + +func init() { + sql.Register("litestream-sqlite3", &sqlite3.SQLiteDriver{ + ConnectHook: func(conn *sqlite3.SQLiteConn) error { + if err := conn.SetFileControlInt("main", sqlite3.SQLITE_FCNTL_PERSIST_WAL, 1); err != nil { + return fmt.Errorf("cannot set file control: %w", err) + } + return nil + }, + }) +} + // SnapshotIterator represents an iterator over a collection of snapshot metadata. type SnapshotIterator interface { io.Closer From e8c6a25f006edf354a8de2f24d1d16784f30868a Mon Sep 17 00:00:00 2001 From: Toni Spets Date: Sat, 20 Aug 2022 13:07:12 +0300 Subject: [PATCH 02/16] Beeper pipeline --- .github/workflows/deploy.yaml | 47 +++++++++++++ .github/workflows/release.docker.yml | 51 -------------- .github/workflows/release.linux.yml | 81 ---------------------- .github/workflows/release.linux_static.yml | 62 ----------------- .github/workflows/test.yml | 62 ----------------- 5 files changed, 47 insertions(+), 256 deletions(-) create mode 100644 .github/workflows/deploy.yaml delete mode 100644 .github/workflows/release.docker.yml delete mode 100644 .github/workflows/release.linux.yml delete mode 100644 .github/workflows/release.linux_static.yml delete mode 100644 .github/workflows/test.yml diff --git a/.github/workflows/deploy.yaml b/.github/workflows/deploy.yaml new file mode 100644 index 00000000..14a91043 --- /dev/null +++ b/.github/workflows/deploy.yaml @@ -0,0 +1,47 @@ +name: Build and Deploy + +on: + push: + +env: + CI_REGISTRY_IMAGE: "${{ secrets.CI_REGISTRY }}/litestream" + +jobs: + build-docker: + runs-on: ubuntu-latest + steps: + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2 + + - name: Login to registry + uses: docker/login-action@v2 + with: + registry: ${{ secrets.CI_REGISTRY }} + username: ${{ secrets.CI_REGISTRY_USER }} + password: ${{ secrets.CI_REGISTRY_PASSWORD }} + + - name: Docker Build + uses: docker/build-push-action@v2 + with: + cache-from: ${{ env.CI_REGISTRY_IMAGE }}:latest + pull: true + file: Dockerfile + tags: ${{ env.CI_REGISTRY_IMAGE }}:${{ github.sha }} + push: true + + deploy-docker: + runs-on: ubuntu-latest + needs: + - build-docker + if: github.ref == 'refs/heads/beeper' + steps: + - name: Login to registry + uses: docker/login-action@v2 + with: + registry: ${{ secrets.CI_REGISTRY }} + username: ${{ secrets.CI_REGISTRY_USER }} + password: ${{ secrets.CI_REGISTRY_PASSWORD }} + + - uses: beeper/docker-retag-push-latest@main + with: + image: ${{ env.CI_REGISTRY_IMAGE }} diff --git a/.github/workflows/release.docker.yml b/.github/workflows/release.docker.yml deleted file mode 100644 index bf492451..00000000 --- a/.github/workflows/release.docker.yml +++ /dev/null @@ -1,51 +0,0 @@ -on: - release: - types: - - published - pull_request: - types: - - opened - - synchronize - - reopened - branches-ignore: - - "dependabot/**" - -name: Release (Docker) -jobs: - docker: - runs-on: ubuntu-latest - env: - PLATFORMS: "linux/amd64,linux/arm64,linux/arm/v7" - VERSION: "${{ github.event_name == 'release' && github.event.release.name || github.sha }}" - - steps: - - uses: actions/checkout@v2 - - uses: docker/setup-qemu-action@v1 - - uses: docker/setup-buildx-action@v1 - - - uses: docker/login-action@v1 - with: - username: benbjohnson - password: ${{ secrets.DOCKERHUB_TOKEN }} - - - id: meta - uses: docker/metadata-action@v3 - with: - images: litestream/litestream - tags: | - type=ref,event=branch - type=ref,event=pr - type=sha - type=sha,format=long - type=semver,pattern={{version}} - type=semver,pattern={{major}}.{{minor}} - - - uses: docker/build-push-action@v2 - with: - context: . - push: true - platforms: ${{ env.PLATFORMS }} - tags: ${{ steps.meta.outputs.tags }} - labels: ${{ steps.meta.outputs.labels }} - build-args: | - LITESTREAM_VERSION=${{ env.VERSION }} \ No newline at end of file diff --git a/.github/workflows/release.linux.yml b/.github/workflows/release.linux.yml deleted file mode 100644 index ce80a956..00000000 --- a/.github/workflows/release.linux.yml +++ /dev/null @@ -1,81 +0,0 @@ -on: - release: - types: - - created - -name: release (linux) -jobs: - build: - runs-on: ubuntu-18.04 - strategy: - matrix: - include: - - arch: amd64 - cc: gcc - - arch: arm64 - cc: aarch64-linux-gnu-gcc - - arch: arm - arm: 6 - cc: arm-linux-gnueabi-gcc - - arch: arm - arm: 7 - cc: arm-linux-gnueabihf-gcc - - env: - GOOS: linux - GOARCH: ${{ matrix.arch }} - GOARM: ${{ matrix.arm }} - CC: ${{ matrix.cc }} - - steps: - - uses: actions/checkout@v2 - - uses: actions/setup-go@v2 - with: - go-version: '1.16' - - - id: release - uses: bruceadams/get-release@v1.2.2 - env: - GITHUB_TOKEN: ${{ github.token }} - - - name: Install cross-compilers - run: | - sudo apt-get update - sudo apt-get install -y gcc-aarch64-linux-gnu gcc-arm-linux-gnueabihf gcc-arm-linux-gnueabi - - - name: Install nfpm - run: | - wget https://github.com/goreleaser/nfpm/releases/download/v2.2.3/nfpm_2.2.3_Linux_x86_64.tar.gz - tar zxvf nfpm_2.2.3_Linux_x86_64.tar.gz - - - name: Build litestream - run: | - rm -rf dist - mkdir -p dist - cp etc/litestream.yml etc/litestream.service dist - cat etc/nfpm.yml | LITESTREAM_VERSION=${{ steps.release.outputs.tag_name }} envsubst > dist/nfpm.yml - CGO_ENABLED=1 go build -ldflags "-s -w -X 'main.Version=${{ steps.release.outputs.tag_name }}'" -o dist/litestream ./cmd/litestream - - cd dist - tar -czvf litestream-${{ steps.release.outputs.tag_name }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}.tar.gz litestream - ../nfpm pkg --config nfpm.yml --packager deb --target litestream-${{ steps.release.outputs.tag_name }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}.deb - - - name: Upload release tarball - uses: actions/upload-release-asset@v1.0.2 - env: - GITHUB_TOKEN: ${{ github.token }} - with: - upload_url: ${{ steps.release.outputs.upload_url }} - asset_path: ./dist/litestream-${{ steps.release.outputs.tag_name }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}.tar.gz - asset_name: litestream-${{ steps.release.outputs.tag_name }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}.tar.gz - asset_content_type: application/gzip - - - name: Upload debian package - uses: actions/upload-release-asset@v1.0.2 - env: - GITHUB_TOKEN: ${{ github.token }} - with: - upload_url: ${{ steps.release.outputs.upload_url }} - asset_path: ./dist/litestream-${{ steps.release.outputs.tag_name }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}.deb - asset_name: litestream-${{ steps.release.outputs.tag_name }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}.deb - asset_content_type: application/octet-stream diff --git a/.github/workflows/release.linux_static.yml b/.github/workflows/release.linux_static.yml deleted file mode 100644 index ddc90b9a..00000000 --- a/.github/workflows/release.linux_static.yml +++ /dev/null @@ -1,62 +0,0 @@ -on: - release: - types: - - created - -name: release (linux/static) -jobs: - build: - runs-on: ubuntu-18.04 - strategy: - matrix: - include: - - arch: amd64 - cc: gcc - - arch: arm64 - cc: aarch64-linux-gnu-gcc - - arch: arm - arm: 6 - cc: arm-linux-gnueabi-gcc - - arch: arm - arm: 7 - cc: arm-linux-gnueabihf-gcc - - env: - GOOS: linux - GOARCH: ${{ matrix.arch }} - GOARM: ${{ matrix.arm }} - CC: ${{ matrix.cc }} - - steps: - - uses: actions/checkout@v2 - - uses: actions/setup-go@v2 - with: - go-version: '1.16' - - - id: release - uses: bruceadams/get-release@v1.2.2 - env: - GITHUB_TOKEN: ${{ github.token }} - - - name: Install cross-compilers - run: | - sudo apt-get update - sudo apt-get install -y gcc-aarch64-linux-gnu gcc-arm-linux-gnueabihf gcc-arm-linux-gnueabi - - - name: Build litestream - run: | - rm -rf dist - mkdir -p dist - CGO_ENABLED=1 go build -ldflags "-s -w -extldflags "-static" -X 'main.Version=${{ steps.release.outputs.tag_name }}'" -tags osusergo,netgo,sqlite_omit_load_extension -o dist/litestream ./cmd/litestream - cd dist - tar -czvf litestream-${{ steps.release.outputs.tag_name }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}-static.tar.gz litestream - - - name: Upload release tarball - uses: actions/upload-release-asset@v1.0.2 - env: - GITHUB_TOKEN: ${{ github.token }} - with: - upload_url: ${{ steps.release.outputs.upload_url }} - asset_path: ./dist/litestream-${{ steps.release.outputs.tag_name }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}-static.tar.gz - asset_name: litestream-${{ steps.release.outputs.tag_name }}-${{ env.GOOS }}-${{ env.GOARCH }}${{ env.GOARM }}-static.tar.gz - asset_content_type: application/gzip diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml deleted file mode 100644 index d2a5eddc..00000000 --- a/.github/workflows/test.yml +++ /dev/null @@ -1,62 +0,0 @@ -on: push -name: test -jobs: - test: - runs-on: ubuntu-18.04 - steps: - - uses: actions/setup-go@v2 - with: - go-version: '1.16' - - - uses: actions/checkout@v2 - - - uses: actions/cache@v2 - with: - path: ~/go/pkg/mod - key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} - restore-keys: | - ${{ runner.os }}-go- - - - name: Extract GCP credentials - run: 'echo "$GOOGLE_APPLICATION_CREDENTIALS" > /opt/gcp.json' - shell: bash - env: - GOOGLE_APPLICATION_CREDENTIALS: ${{secrets.GOOGLE_APPLICATION_CREDENTIALS}} - - - name: Extract SSH key - run: 'echo "$LITESTREAM_SFTP_KEY" > /opt/id_ed25519' - shell: bash - env: - LITESTREAM_SFTP_KEY: ${{secrets.LITESTREAM_SFTP_KEY}} - - - name: Run unit tests - run: go test -v ./... - - - name: Run aws s3 tests - run: go test -v -run=TestReplicaClient . -integration s3 - env: - LITESTREAM_S3_ACCESS_KEY_ID: ${{ secrets.LITESTREAM_S3_ACCESS_KEY_ID }} - LITESTREAM_S3_SECRET_ACCESS_KEY: ${{ secrets.LITESTREAM_S3_SECRET_ACCESS_KEY }} - LITESTREAM_S3_REGION: us-east-1 - LITESTREAM_S3_BUCKET: integration.litestream.io - - - name: Run google cloud storage (gcs) tests - run: go test -v -run=TestReplicaClient . -integration gcs - env: - GOOGLE_APPLICATION_CREDENTIALS: /opt/gcp.json - LITESTREAM_GCS_BUCKET: integration.litestream.io - - - name: Run azure blob storage (abs) tests - run: go test -v -run=TestReplicaClient . -integration abs - env: - LITESTREAM_ABS_ACCOUNT_NAME: ${{ secrets.LITESTREAM_ABS_ACCOUNT_NAME }} - LITESTREAM_ABS_ACCOUNT_KEY: ${{ secrets.LITESTREAM_ABS_ACCOUNT_KEY }} - LITESTREAM_ABS_BUCKET: integration - - - name: Run sftp tests - run: go test -v -run=TestReplicaClient . -integration sftp - env: - LITESTREAM_SFTP_HOST: litestream-test-sftp.fly.dev:2222 - LITESTREAM_SFTP_USER: litestream - LITESTREAM_SFTP_KEY_PATH: /opt/id_ed25519 - LITESTREAM_SFTP_PATH: /litestream From b9ed66e31fe5b0931e3e2cacb3faf93fa75017de Mon Sep 17 00:00:00 2001 From: Toni Spets Date: Tue, 23 Aug 2022 14:15:22 +0300 Subject: [PATCH 03/16] Add sqlite binary for db access --- Dockerfile | 1 + 1 file changed, 1 insertion(+) diff --git a/Dockerfile b/Dockerfile index c0dd0cc0..c4c37c8c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -11,6 +11,7 @@ RUN --mount=type=cache,target=/root/.cache/go-build \ FROM alpine +RUN apk add --no-cache sqlite # for debugging COPY --from=builder /usr/local/bin/litestream /usr/local/bin/litestream ENTRYPOINT ["/usr/local/bin/litestream"] CMD [] From 24894ced63752273eb81ed3d1cfc64c19da70ebc Mon Sep 17 00:00:00 2001 From: Toni Spets Date: Thu, 8 Sep 2022 18:09:20 +0300 Subject: [PATCH 04/16] Build with go 1.19 --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index c4c37c8c..e44fc7ea 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.17 as builder +FROM golang:1.19 as builder WORKDIR /src/litestream COPY . . From 3e473a4cf4428bb5c2160a14f7741b0317d32524 Mon Sep 17 00:00:00 2001 From: Toni Spets Date: Mon, 3 Oct 2022 08:29:43 +0300 Subject: [PATCH 05/16] Always verbose until backend update --- cmd/litestream/restore.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/litestream/restore.go b/cmd/litestream/restore.go index 28c20fc1..e223c34e 100644 --- a/cmd/litestream/restore.go +++ b/cmd/litestream/restore.go @@ -31,7 +31,7 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) { ifDBNotExists := fs.Bool("if-db-not-exists", false, "") ifReplicaExists := fs.Bool("if-replica-exists", false, "") timestampStr := fs.String("timestamp", "", "timestamp") - verbose := fs.Bool("v", false, "verbose output") + verbose := fs.Bool("v", true, "verbose output") fs.Usage = c.Usage if err := fs.Parse(args); err != nil { return err From 8a8ec8b1cb7b974fa5b0317d40488880abedc8c4 Mon Sep 17 00:00:00 2001 From: Toni Spets Date: Mon, 3 Oct 2022 09:30:57 +0300 Subject: [PATCH 06/16] Improved restore/sync logging --- replica.go | 35 +++++++++++++++++++++++++---------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/replica.go b/replica.go index 3cf81f27..8e976c63 100644 --- a/replica.go +++ b/replica.go @@ -213,6 +213,11 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) { // Obtain initial position from shadow reader. // It may have moved to the next index if previous position was at the end. pos := rd.Pos() + initialPos := pos + startTime := time.Now() + var bytesWritten int + + log.Printf("%s(%s): write wal segment %s/%08x:%08x", r.db.Path(), r.Name(), initialPos.Generation, initialPos.Index, initialPos.Offset) // Copy through pipe into client from the starting position. var g errgroup.Group @@ -242,6 +247,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) { return err } walBytesCounter.Add(float64(n)) + bytesWritten += n } // Copy frames. @@ -268,6 +274,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) { return err } walBytesCounter.Add(float64(n)) + bytesWritten += n } // Flush LZ4 writer and close pipe. @@ -291,6 +298,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) { replicaWALIndexGaugeVec.WithLabelValues(r.db.Path(), r.Name()).Set(float64(rd.Pos().Index)) replicaWALOffsetGaugeVec.WithLabelValues(r.db.Path(), r.Name()).Set(float64(rd.Pos().Offset)) + log.Printf("%s(%s): wal segment written %s/%08x:%08x elapsed=%s sz=%d", r.db.Path(), r.Name(), initialPos.Generation, initialPos.Index, initialPos.Offset, time.Since(startTime).String(), bytesWritten) return nil } @@ -483,6 +491,9 @@ func (r *Replica) Snapshot(ctx context.Context) (info SnapshotInfo, err error) { return pw.Close() }) + log.Printf("%s(%s): write snapshot %s/%08x", r.db.Path(), r.Name(), pos.Generation, pos.Index) + startTime := time.Now() + // Delegate write to client & wait for writer goroutine to finish. if info, err = r.Client.WriteSnapshot(ctx, pos.Generation, pos.Index, pr); err != nil { return info, err @@ -490,7 +501,7 @@ func (r *Replica) Snapshot(ctx context.Context) (info SnapshotInfo, err error) { return info, err } - log.Printf("%s(%s): snapshot written %s/%08x", r.db.Path(), r.Name(), pos.Generation, pos.Index) + log.Printf("%s(%s): snapshot written %s/%08x elapsed=%s sz=%d", r.db.Path(), r.Name(), pos.Generation, pos.Index, time.Since(startTime).String(), info.Size) return info, nil } @@ -1019,9 +1030,12 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) { // Copy snapshot to output path. logger.Printf("%s: restoring snapshot %s/%08x to %s", logPrefix, opt.Generation, minWALIndex, tmpPath) - if err := r.restoreSnapshot(ctx, pos.Generation, pos.Index, tmpPath); err != nil { + startTime := time.Now() + bytes, err := r.restoreSnapshot(ctx, pos.Generation, pos.Index, tmpPath) + if err != nil { return fmt.Errorf("cannot restore snapshot: %w", err) } + logger.Printf("%s: restored snapshot %s/%08x elapsed=%s sz=%d (uncompressed)", logPrefix, opt.Generation, minWALIndex, time.Since(startTime).String(), bytes) // If no WAL files available, move snapshot to final path & exit early. if snapshotOnly { @@ -1236,7 +1250,7 @@ func (r *Replica) walSegmentMap(ctx context.Context, generation string, maxIndex } // restoreSnapshot copies a snapshot from the replica to a file. -func (r *Replica) restoreSnapshot(ctx context.Context, generation string, index int, filename string) error { +func (r *Replica) restoreSnapshot(ctx context.Context, generation string, index int, filename string) (int64, error) { // Determine the user/group & mode based on the DB, if available. var fileInfo, dirInfo os.FileInfo if db := r.DB(); db != nil { @@ -1244,27 +1258,28 @@ func (r *Replica) restoreSnapshot(ctx context.Context, generation string, index } if err := internal.MkdirAll(filepath.Dir(filename), dirInfo); err != nil { - return err + return 0, err } f, err := internal.CreateFile(filename, fileInfo) if err != nil { - return err + return 0, err } defer f.Close() rd, err := r.Client.SnapshotReader(ctx, generation, index) if err != nil { - return err + return 0, err } defer rd.Close() - if _, err := io.Copy(f, lz4.NewReader(rd)); err != nil { - return err + if bytes, err := io.Copy(f, lz4.NewReader(rd)); err != nil { + return 0, err } else if err := f.Sync(); err != nil { - return err + return 0, err + } else { + return bytes, f.Close() } - return f.Close() } // downloadWAL copies a WAL file from the replica to a local copy next to the DB. From 1670bb5640fd947f12684bb31de0ea46b1d9e8ca Mon Sep 17 00:00:00 2001 From: Toni Spets Date: Tue, 4 Oct 2022 07:42:58 +0300 Subject: [PATCH 07/16] Early initialize database Workaround for #436 --- db.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/db.go b/db.go index b7a31eb1..d604fbbd 100644 --- a/db.go +++ b/db.go @@ -288,6 +288,9 @@ func (db *DB) Open() (err error) { go func() { defer db.wg.Done(); db.monitor() }() } + // Try to initialize the database early, allow failing. + db.init() + return nil } From ac9ad40d7154a6d282efc5ca8f26788daa871434 Mon Sep 17 00:00:00 2001 From: Toni Spets Date: Mon, 17 Oct 2022 10:28:33 +0300 Subject: [PATCH 08/16] Add age encryption support --- cmd/litestream/main.go | 21 +++++++++++++ go.mod | 3 +- go.sum | 9 ++++++ replica.go | 71 +++++++++++++++++++++++++++++++++++++++--- 4 files changed, 99 insertions(+), 5 deletions(-) diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index d186f61f..e451746a 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -17,6 +17,7 @@ import ( "strings" "time" + "filippo.io/age" "github.com/benbjohnson/litestream" "github.com/benbjohnson/litestream/abs" "github.com/benbjohnson/litestream/file" @@ -323,6 +324,10 @@ type ReplicaConfig struct { User string `yaml:"user"` Password string `yaml:"password"` KeyPath string `yaml:"key-path"` + + // Encryption identities and recipients + AgeIdentities []string `yaml:"age-identities"` + AgeRecipients []string `yaml:"age-recipients"` } // NewReplicaFromConfig instantiates a replica for a DB based on a config. @@ -349,6 +354,22 @@ func NewReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *litestream.Re if v := c.ValidationInterval; v != nil { r.ValidationInterval = *v } + for _, str := range c.AgeIdentities { + identities, err := age.ParseIdentities(strings.NewReader(str)) + if err != nil { + return nil, err + } + + r.AgeIdentities = append(r.AgeIdentities, identities...) + } + for _, str := range c.AgeRecipients { + recipients, err := age.ParseRecipients(strings.NewReader(str)) + if err != nil { + return nil, err + } + + r.AgeRecipients = append(r.AgeRecipients, recipients...) + } // Build and set client on replica. switch c.ReplicaType() { diff --git a/go.mod b/go.mod index 928c4d39..50f640de 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.16 require ( cloud.google.com/go/storage v1.15.0 + filippo.io/age v1.0.0 github.com/Azure/azure-storage-blob-go v0.13.0 github.com/aws/aws-sdk-go v1.27.0 github.com/mattn/go-shellwords v1.0.11 @@ -11,7 +12,7 @@ require ( github.com/pierrec/lz4/v4 v4.1.15 github.com/pkg/sftp v1.13.0 github.com/prometheus/client_golang v1.12.2 - golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a + golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 google.golang.org/api v0.45.0 diff --git a/go.sum b/go.sum index 4cc7c984..4f3ff783 100644 --- a/go.sum +++ b/go.sum @@ -39,6 +39,9 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9 cloud.google.com/go/storage v1.15.0 h1:Ljj+ZXVEhCr/1+4ZhvtteN1ND7UUsNTlduGclLh8GO0= cloud.google.com/go/storage v1.15.0/go.mod h1:mjjQMoxxyGH7Jr8K5qrx6N2O0AHsczI61sMNn03GIZI= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= +filippo.io/age v1.0.0 h1:V6q14n0mqYU3qKFkZ6oOaF9oXneOviS3ubXsSVBRSzc= +filippo.io/age v1.0.0/go.mod h1:PaX+Si/Sd5G8LgfCwldsSba3H1DDQZhIhFGkhbHaBq8= +filippo.io/edwards25519 v1.0.0-rc.1/go.mod h1:N1IkdkCkiLB6tki+MYJoSx2JTY9NUlxZE7eHn5EwJns= github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U= github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k= github.com/Azure/azure-storage-blob-go v0.13.0 h1:lgWHvFh+UYBNVQLFHXkvul2f6yOPA9PIH82RTG2cSwc= @@ -276,6 +279,8 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a h1:kr2P4QFmQr29mSLA43kwrOcgcReGTfbE9N577tCTuBc= golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= +golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 h1:HWj/xjIHfjYU5nVXpTM0s39J9CbLn7Cc5a7IC5rwsMQ= +golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -424,11 +429,15 @@ golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210412220455-f1c623a9e750/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210903071746-97244b99971b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 h1:XfKQ4OlFl8okEOr5UvAqFRVj8pY/4yfcXrddB8qAbU0= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b h1:9zKuko04nR4gjZ4+DNjHqRlAJqbJETHwiNKDqTfOjfE= +golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/replica.go b/replica.go index 8e976c63..d3531f36 100644 --- a/replica.go +++ b/replica.go @@ -15,6 +15,7 @@ import ( "sync" "time" + "filippo.io/age" "github.com/benbjohnson/litestream/internal" "github.com/pierrec/lz4/v4" "github.com/prometheus/client_golang/prometheus" @@ -67,6 +68,10 @@ type Replica struct { // If true, replica monitors database for changes automatically. // Set to false if replica is being used synchronously (such as in tests). MonitorEnabled bool + + // Encryption identities and recipients + AgeIdentities []age.Identity + AgeRecipients []age.Recipient } func NewReplica(db *DB, name string) *Replica { @@ -226,8 +231,20 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) { return err }) + var ew io.WriteCloser = pw + + // Add encryption if we have recipients. + if len(r.AgeRecipients) > 0 { + var err error + ew, err = age.Encrypt(pw, r.AgeRecipients...) + defer ew.Close() + if err != nil { + return err + } + } + // Wrap writer to LZ4 compress. - zw := lz4.NewWriter(pw) + zw := lz4.NewWriter(ew) // Track total WAL bytes written to replica client. walBytesCounter := replicaWALBytesCounterVec.WithLabelValues(r.db.Path(), r.Name()) @@ -277,9 +294,11 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) { bytesWritten += n } - // Flush LZ4 writer and close pipe. + // Flush LZ4 writer, encryption writer and close pipe. if err := zw.Close(); err != nil { return err + } else if err := ew.Close(); err != nil { + return err } else if err := pw.Close(); err != nil { return err } @@ -342,6 +361,15 @@ func (r *Replica) calcPos(ctx context.Context, generation string) (pos Pos, err } defer rd.Close() + if len(r.AgeIdentities) > 0 { + drd, err := age.Decrypt(rd, r.AgeIdentities...) + if err != nil { + return pos, err + } + + rd = ioutil.NopCloser(drd) + } + n, err := io.Copy(ioutil.Discard, lz4.NewReader(rd)) if err != nil { return pos, err @@ -478,7 +506,23 @@ func (r *Replica) Snapshot(ctx context.Context) (info SnapshotInfo, err error) { // Copy the database file to the LZ4 writer in a separate goroutine. var g errgroup.Group g.Go(func() error { - zr := lz4.NewWriter(pw) + var wc io.WriteCloser = pw + + // Add encryption if we have recipients. + if len(r.AgeRecipients) > 0 { + // We need to ensure the pipe is closed. + defer pw.Close() + + var err error + wc, err = age.Encrypt(pw, r.AgeRecipients...) + defer wc.Close() + if err != nil { + pw.CloseWithError(err) + return err + } + } + + zr := lz4.NewWriter(wc) defer zr.Close() if _, err := io.Copy(zr, r.f); err != nil { @@ -488,7 +532,7 @@ func (r *Replica) Snapshot(ctx context.Context) (info SnapshotInfo, err error) { pw.CloseWithError(err) return err } - return pw.Close() + return wc.Close() }) log.Printf("%s(%s): write snapshot %s/%08x", r.db.Path(), r.Name(), pos.Generation, pos.Index) @@ -1273,6 +1317,15 @@ func (r *Replica) restoreSnapshot(ctx context.Context, generation string, index } defer rd.Close() + if len(r.AgeIdentities) > 0 { + drd, err := age.Decrypt(rd, r.AgeIdentities...) + if err != nil { + return 0, err + } + + rd = ioutil.NopCloser(drd) + } + if bytes, err := io.Copy(f, lz4.NewReader(rd)); err != nil { return 0, err } else if err := f.Sync(); err != nil { @@ -1300,6 +1353,16 @@ func (r *Replica) downloadWAL(ctx context.Context, generation string, index int, return err } defer rd.Close() + + if len(r.AgeIdentities) > 0 { + drd, err := age.Decrypt(rd, r.AgeIdentities...) + if err != nil { + return err + } + + rd = ioutil.NopCloser(drd) + } + readers = append(readers, lz4.NewReader(rd)) } From cb44be6d5418a227e88b26501f3d1e485ed7b317 Mon Sep 17 00:00:00 2001 From: Toni Spets Date: Thu, 20 Oct 2022 14:34:15 +0300 Subject: [PATCH 09/16] Force truncation checkpoint if WAL becomes runaway A failsafe to force WAL truncation if somehow between normal checkpoints the WAL grows to extreme sizes. Default is around 200 megabytes with a 4k page size. --- db.go | 76 +++++++++++++++++++++++++++++++++++++---------------------- 1 file changed, 48 insertions(+), 28 deletions(-) diff --git a/db.go b/db.go index d604fbbd..1f952da2 100644 --- a/db.go +++ b/db.go @@ -31,6 +31,7 @@ const ( DefaultCheckpointInterval = 1 * time.Minute DefaultMinCheckpointPageN = 1000 DefaultMaxCheckpointPageN = 10000 + DefaultTruncatePageN = 500000 ) // MaxIndex is the maximum possible WAL index. @@ -83,6 +84,16 @@ type DB struct { // unbounded if there are always read transactions occurring. MaxCheckpointPageN int + // Threshold of WAL size, in pages, before a forced truncation checkpoint. + // A forced truncation checkpoint will block new transactions and wait for + // existing transactions to finish before issuing a checkpoint and + // truncating the WAL. + // + // If zero, no truncates are forced. This can cause the WAL to grow + // unbounded if there's a sudden spike of changes between other + // checkpoints. + TruncatePageN int + // Time between automatic checkpoints in the WAL. This is done to allow // more fine-grained WAL files so that restores can be performed with // better precision. @@ -104,6 +115,7 @@ func NewDB(path string) *DB { MinCheckpointPageN: DefaultMinCheckpointPageN, MaxCheckpointPageN: DefaultMaxCheckpointPageN, + TruncatePageN: DefaultTruncatePageN, CheckpointInterval: DefaultCheckpointInterval, MonitorInterval: DefaultMonitorInterval, } @@ -740,7 +752,7 @@ func (db *DB) Sync(ctx context.Context) (err error) { } // Synchronize real WAL with current shadow WAL. - newWALSize, err := db.syncWAL(info) + origWALSize, newWALSize, err := db.syncWAL(info) if err != nil { return fmt.Errorf("sync wal: %w", err) } @@ -749,7 +761,9 @@ func (db *DB) Sync(ctx context.Context) (err error) { // If WAL size is greater than min threshold, attempt checkpoint. var checkpoint bool checkpointMode := CheckpointModePassive - if db.MaxCheckpointPageN > 0 && newWALSize >= calcWALSize(db.pageSize, db.MaxCheckpointPageN) { + if db.TruncatePageN > 0 && origWALSize >= calcWALSize(db.pageSize, db.TruncatePageN) { + checkpoint, checkpointMode = true, CheckpointModeTruncate + } else if db.MaxCheckpointPageN > 0 && newWALSize >= calcWALSize(db.pageSize, db.MaxCheckpointPageN) { checkpoint, checkpointMode = true, CheckpointModeRestart } else if newWALSize >= calcWALSize(db.pageSize, db.MinCheckpointPageN) { checkpoint = true @@ -908,29 +922,29 @@ type syncInfo struct { } // syncWAL copies pending bytes from the real WAL to the shadow WAL. -func (db *DB) syncWAL(info syncInfo) (newSize int64, err error) { +func (db *DB) syncWAL(info syncInfo) (origSize int64, newSize int64, err error) { // Copy WAL starting from end of shadow WAL. Exit if no new shadow WAL needed. - newSize, err = db.copyToShadowWAL(info.shadowWALPath) + origSize, newSize, err = db.copyToShadowWAL(info.shadowWALPath) if err != nil { - return newSize, fmt.Errorf("cannot copy to shadow wal: %w", err) + return origSize, newSize, fmt.Errorf("cannot copy to shadow wal: %w", err) } else if !info.restart { - return newSize, nil // If no restart required, exit. + return origSize, newSize, nil // If no restart required, exit. } // Parse index of current shadow WAL file. dir, base := filepath.Split(info.shadowWALPath) index, err := ParseWALPath(base) if err != nil { - return 0, fmt.Errorf("cannot parse shadow wal filename: %s", base) + return 0, 0, fmt.Errorf("cannot parse shadow wal filename: %s", base) } // Start a new shadow WAL file with next index. newShadowWALPath := filepath.Join(dir, FormatWALPath(index+1)) newSize, err = db.initShadowWALFile(newShadowWALPath) if err != nil { - return 0, fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err) + return 0, 0, fmt.Errorf("cannot init shadow wal file: name=%s err=%w", newShadowWALPath, err) } - return newSize, nil + return origSize, newSize, nil } func (db *DB) initShadowWALFile(filename string) (int64, error) { @@ -966,58 +980,64 @@ func (db *DB) initShadowWALFile(filename string) (int64, error) { _ = os.Chown(filename, uid, gid) // Copy as much shadow WAL as available. - newSize, err := db.copyToShadowWAL(filename) + _, newSize, err := db.copyToShadowWAL(filename) if err != nil { return 0, fmt.Errorf("cannot copy to new shadow wal: %w", err) } return newSize, nil } -func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) { +func (db *DB) copyToShadowWAL(filename string) (origWalSize int64, newSize int64, err error) { Tracef("%s: copy-shadow: %s", db.path, filename) r, err := os.Open(db.WALPath()) if err != nil { - return 0, err + return 0, 0, err } defer r.Close() + fi, err := r.Stat() + if err != nil { + return 0, 0, err + } + origWalSize = frameAlign(fi.Size(), db.pageSize) + w, err := os.OpenFile(filename, os.O_RDWR, 0666) if err != nil { - return 0, err + return 0, 0, err } defer w.Close() - fi, err := w.Stat() + fi, err = w.Stat() if err != nil { - return 0, err + return 0, 0, err } origSize := frameAlign(fi.Size(), db.pageSize) // Read shadow WAL header to determine byte order for checksum & salt. hdr := make([]byte, WALHeaderSize) if _, err := io.ReadFull(w, hdr); err != nil { - return 0, fmt.Errorf("read header: %w", err) + return 0, 0, fmt.Errorf("read header: %w", err) } hsalt0 := binary.BigEndian.Uint32(hdr[16:]) hsalt1 := binary.BigEndian.Uint32(hdr[20:]) bo, err := headerByteOrder(hdr) if err != nil { - return 0, err + return 0, 0, err } // Read previous checksum. chksum0, chksum1, err := readLastChecksumFrom(w, db.pageSize) if err != nil { - return 0, fmt.Errorf("last checksum: %w", err) + return 0, 0, fmt.Errorf("last checksum: %w", err) } // Seek to correct position on real wal. if _, err := r.Seek(origSize, io.SeekStart); err != nil { - return 0, fmt.Errorf("real wal seek: %w", err) + return 0, 0, fmt.Errorf("real wal seek: %w", err) } else if _, err := w.Seek(origSize, io.SeekStart); err != nil { - return 0, fmt.Errorf("shadow wal seek: %w", err) + return 0, 0, fmt.Errorf("shadow wal seek: %w", err) } // Read through WAL from last position to find the page of the last @@ -1032,7 +1052,7 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) { Tracef("%s: copy-shadow: break %s @ %d; err=%s", db.path, filename, offset, err) break // end of file or partial page } else if err != nil { - return 0, fmt.Errorf("read wal: %w", err) + return 0, 0, fmt.Errorf("read wal: %w", err) } // Read frame salt & compare to header salt. Stop reading on mismatch. @@ -1063,7 +1083,7 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) { newDBSize := binary.BigEndian.Uint32(frame[4:]) if newDBSize != 0 { if _, err := buf.WriteTo(w); err != nil { - return 0, fmt.Errorf("write shadow wal: %w", err) + return 0, 0, fmt.Errorf("write shadow wal: %w", err) } buf.Reset() lastCommitSize = offset @@ -1072,15 +1092,15 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) { // Sync & close. if err := w.Sync(); err != nil { - return 0, err + return 0, 0, err } else if err := w.Close(); err != nil { - return 0, err + return 0, 0, err } // Track total number of bytes written to WAL. db.totalWALBytesCounter.Add(float64(lastCommitSize - origSize)) - return lastCommitSize, nil + return origWalSize, lastCommitSize, nil } // ShadowWALReader opens a reader for a shadow WAL file at a given position. @@ -1249,7 +1269,7 @@ func (db *DB) checkpoint(ctx context.Context, generation, mode string) error { } // Copy shadow WAL before checkpoint to copy as much as possible. - if _, err := db.copyToShadowWAL(shadowWALPath); err != nil { + if _, _, err := db.copyToShadowWAL(shadowWALPath); err != nil { return fmt.Errorf("cannot copy to end of shadow wal before checkpoint: %w", err) } @@ -1284,7 +1304,7 @@ func (db *DB) checkpoint(ctx context.Context, generation, mode string) error { } // Copy the end of the previous WAL before starting a new shadow WAL. - if _, err := db.copyToShadowWAL(shadowWALPath); err != nil { + if _, _, err := db.copyToShadowWAL(shadowWALPath); err != nil { return fmt.Errorf("cannot copy to end of shadow wal: %w", err) } @@ -1343,7 +1363,7 @@ func (db *DB) execCheckpoint(mode string) (err error) { if err := db.db.QueryRow(rawsql).Scan(&row[0], &row[1], &row[2]); err != nil { return err } - Tracef("%s: checkpoint: mode=%v (%d,%d,%d)", db.path, mode, row[0], row[1], row[2]) + log.Printf("%s: checkpoint: mode=%v (%d,%d,%d)", db.path, mode, row[0], row[1], row[2]) // Reacquire the read lock immediately after the checkpoint. if err := db.acquireReadLock(); err != nil { From fbd8089aba49d83bc549036508ceb85be90f514b Mon Sep 17 00:00:00 2001 From: Lincoln Stoll Date: Mon, 31 Oct 2022 09:14:02 +0100 Subject: [PATCH 10/16] Handle errors when deleting objects from S3 I recently noticed that the cost for ListBucket calls was increasing for an application that was using Litestream. After investigating it seemed that the bucket had retained the entire history of data, while Litestream was continually logging that it was deleting the same data: ``` 2022-10-30T12:00:27Z (s3): wal segmented deleted before 0792d3393bf79ced/00000233: n=1428 2022-10-30T13:00:24Z (s3): wal segmented deleted before 0792d3393bf79ced/00000233: n=1428 ``` This is occuring because the DeleteObjects call is a batch item, that returns the individual object deletion errors in the response[1]. The S3 replica client discards the response, and only handles errors in the original API call. I had a misconfigured IAM policy that meant all deletes were failing, but this never actually bubbled up as a real error. To fix this, I added a check for the response body to handle any errors the operation might have encountered. Because this may include a large number of errors (in this case 1428 of them), the output is summarized to avoid an overly large error message. When items are not found, they will not return an error[2] - they will still be marked as deleted, so this change should be in-line with the original intentions of this code. 1: https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_Example_2 2: https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html --- s3/replica_client.go | 45 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 36 insertions(+), 9 deletions(-) diff --git a/s3/replica_client.go b/s3/replica_client.go index b68628a1..7632a504 100644 --- a/s3/replica_client.go +++ b/s3/replica_client.go @@ -208,10 +208,14 @@ func (c *ReplicaClient) DeleteGeneration(ctx context.Context, generation string) n = len(objIDs) } - if _, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{ + out, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{ Bucket: aws.String(c.Bucket), Delete: &s3.Delete{Objects: objIDs[:n], Quiet: aws.Bool(true)}, - }); err != nil { + }) + if err != nil { + return err + } + if err := deleteOutputError(out); err != nil { return err } internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc() @@ -303,10 +307,14 @@ func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, i return fmt.Errorf("cannot determine snapshot path: %w", err) } - if _, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{ + out, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{ Bucket: aws.String(c.Bucket), Delete: &s3.Delete{Objects: []*s3.ObjectIdentifier{{Key: &key}}, Quiet: aws.Bool(true)}, - }); err != nil { + }) + if err != nil { + return err + } + if err := deleteOutputError(out); err != nil { return err } @@ -405,13 +413,16 @@ func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Po } // Delete S3 objects in bulk. - if _, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{ + out, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{ Bucket: aws.String(c.Bucket), Delete: &s3.Delete{Objects: objIDs[:n], Quiet: aws.Bool(true)}, - }); err != nil { + }) + if err != nil { + return err + } + if err := deleteOutputError(out); err != nil { return err } - internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc() a = a[n:] @@ -454,10 +465,14 @@ func (c *ReplicaClient) DeleteAll(ctx context.Context) error { n = len(objIDs) } - if _, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{ + out, err := c.s3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{ Bucket: aws.String(c.Bucket), Delete: &s3.Delete{Objects: objIDs[:n], Quiet: aws.Bool(true)}, - }); err != nil { + }) + if err != nil { + return err + } + if err := deleteOutputError(out); err != nil { return err } internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "DELETE").Inc() @@ -737,3 +752,15 @@ func isNotExists(err error) bool { return false } } + +func deleteOutputError(out *s3.DeleteObjectsOutput) error { + switch len(out.Errors) { + case 0: + return nil + case 1: + return fmt.Errorf("deleting object %s: %s - %s", *out.Errors[0].Key, *out.Errors[0].Code, *out.Errors[0].Message) + default: + return fmt.Errorf("%d errors occured deleting objects, %s: %s - (%s (and %d others)", + len(out.Errors), *out.Errors[0].Key, *out.Errors[0].Code, *out.Errors[0].Message, len(out.Errors)-1) + } +} From c14eef64d5b3dbcc9c87d07193a5e1d665f46c1e Mon Sep 17 00:00:00 2001 From: Toni Spets Date: Fri, 30 Dec 2022 14:00:32 +0200 Subject: [PATCH 11/16] Make sqlite3 cli safer to use with live databases --- Dockerfile | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index e44fc7ea..ba9a6374 100644 --- a/Dockerfile +++ b/Dockerfile @@ -11,7 +11,12 @@ RUN --mount=type=cache,target=/root/.cache/go-build \ FROM alpine -RUN apk add --no-cache sqlite # for debugging + +# for debugging +RUN apk add --no-cache sqlite && \ + printf '#!/bin/sh\nexec /usr/bin/sqlite3 -cmd "PRAGMA foreign_keys=ON; PRAGMA journal_mode=WAL; PRAGMA wal_autocheckpoint=0; PRAGMA busy_timeout=5000;" "$@"\n' > /usr/local/bin/sqlite3 && \ + chmod +x /usr/local/bin/sqlite3 + COPY --from=builder /usr/local/bin/litestream /usr/local/bin/litestream ENTRYPOINT ["/usr/local/bin/litestream"] CMD [] From 4fd46a84af095e22b642a83f0c4b68f608680166 Mon Sep 17 00:00:00 2001 From: Erik Kristensen Date: Tue, 10 Jan 2023 20:58:43 -0700 Subject: [PATCH 12/16] fix: aws credential chain by using aws.Config --- s3/replica_client.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/s3/replica_client.go b/s3/replica_client.go index 7632a504..a1206a09 100644 --- a/s3/replica_client.go +++ b/s3/replica_client.go @@ -16,7 +16,6 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/aws/defaults" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" @@ -93,6 +92,8 @@ func (c *ReplicaClient) Init(ctx context.Context) (err error) { if region != "" { config.Region = aws.String(region) } + config.CredentialsChainVerboseErrors = aws.Bool(true) + sess, err := session.NewSession(config) if err != nil { return fmt.Errorf("cannot create aws session: %w", err) @@ -105,7 +106,8 @@ func (c *ReplicaClient) Init(ctx context.Context) (err error) { // config returns the AWS configuration. Uses the default credential chain // unless a key/secret are explicitly set. func (c *ReplicaClient) config() *aws.Config { - config := defaults.Get().Config + config := &aws.Config{} + if c.AccessKeyID != "" || c.SecretAccessKey != "" { config.Credentials = credentials.NewStaticCredentials(c.AccessKeyID, c.SecretAccessKey, "") } From edad91537466d601407a156f4a9f720c1afa5f52 Mon Sep 17 00:00:00 2001 From: Erik Kristensen Date: Tue, 10 Jan 2023 20:59:39 -0700 Subject: [PATCH 13/16] fix: remove debug code --- s3/replica_client.go | 1 - 1 file changed, 1 deletion(-) diff --git a/s3/replica_client.go b/s3/replica_client.go index a1206a09..6919eae1 100644 --- a/s3/replica_client.go +++ b/s3/replica_client.go @@ -92,7 +92,6 @@ func (c *ReplicaClient) Init(ctx context.Context) (err error) { if region != "" { config.Region = aws.String(region) } - config.CredentialsChainVerboseErrors = aws.Bool(true) sess, err := session.NewSession(config) if err != nil { From 1f1b3782a29c88bb4e7083d2d5a7bb06f058e7c0 Mon Sep 17 00:00:00 2001 From: Toni Spets Date: Wed, 18 Jan 2023 21:16:16 +0200 Subject: [PATCH 14/16] Move scripts out of Dockerfile --- Dockerfile | 14 +++++++++++--- etc/aws-config | 2 ++ etc/aws-k8s-sa-provider | 13 +++++++++++++ etc/sqlite3 | 2 ++ 4 files changed, 28 insertions(+), 3 deletions(-) create mode 100644 etc/aws-config create mode 100755 etc/aws-k8s-sa-provider create mode 100755 etc/sqlite3 diff --git a/Dockerfile b/Dockerfile index ba9a6374..b278eda2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -9,14 +9,22 @@ RUN --mount=type=cache,target=/root/.cache/go-build \ --mount=type=cache,target=/go/pkg \ go build -ldflags "-s -w -X 'main.Version=${LITESTREAM_VERSION}' -extldflags '-static'" -tags osusergo,netgo,sqlite_omit_load_extension -o /usr/local/bin/litestream ./cmd/litestream +# to make copy a single layer later +COPY etc/sqlite3 etc/aws-k8s-sa-provider /usr/local/bin/ FROM alpine +ENV AWS_SDK_LOAD_CONFIG=1 # for debugging RUN apk add --no-cache sqlite && \ - printf '#!/bin/sh\nexec /usr/bin/sqlite3 -cmd "PRAGMA foreign_keys=ON; PRAGMA journal_mode=WAL; PRAGMA wal_autocheckpoint=0; PRAGMA busy_timeout=5000;" "$@"\n' > /usr/local/bin/sqlite3 && \ - chmod +x /usr/local/bin/sqlite3 + mkdir /root/.aws + +COPY etc/aws-config /root/.aws/config +COPY --from=builder \ + /usr/local/bin/litestream \ + /usr/local/bin/sqlite3 \ + /usr/local/bin/aws-k8s-sa-provider \ + /usr/local/bin/ -COPY --from=builder /usr/local/bin/litestream /usr/local/bin/litestream ENTRYPOINT ["/usr/local/bin/litestream"] CMD [] diff --git a/etc/aws-config b/etc/aws-config new file mode 100644 index 00000000..121d37ac --- /dev/null +++ b/etc/aws-config @@ -0,0 +1,2 @@ +[default] +credential_process = /usr/local/bin/aws-k8s-sa-provider diff --git a/etc/aws-k8s-sa-provider b/etc/aws-k8s-sa-provider new file mode 100755 index 00000000..a2e58274 --- /dev/null +++ b/etc/aws-k8s-sa-provider @@ -0,0 +1,13 @@ +#!/bin/sh +DATEFMT=%Y-%m-%dT%H:%M:%SZ +[ -z "$SERVICE_ACCOUNT_TOKEN" ] && exit 1 +[ -f "$SERVICE_ACCOUNT_TOKEN" ] || exit 1 +cat << EOF +{ + "Version": 1, + "AccessKeyId": "aws-k8s-sa-provider@$(date +$DATEFMT)", + "SecretAccessKey": "n/a", + "SessionToken": "$(cat $SERVICE_ACCOUNT_TOKEN)", + "Expiration": "$(date -ud @$(($(date -u +%s) + 300)) +$DATEFMT)" +} +EOF diff --git a/etc/sqlite3 b/etc/sqlite3 new file mode 100755 index 00000000..9a824dbd --- /dev/null +++ b/etc/sqlite3 @@ -0,0 +1,2 @@ +#!/bin/sh +exec /usr/bin/sqlite3 -cmd "PRAGMA foreign_keys=ON; PRAGMA journal_mode=WAL; PRAGMA wal_autocheckpoint=0; PRAGMA busy_timeout=5000;" "$@" From 5d54928321114bb7359def483f3c3fd7ddc3d946 Mon Sep 17 00:00:00 2001 From: Toni Spets Date: Fri, 20 Jan 2023 09:02:06 +0200 Subject: [PATCH 15/16] Move AWS config to common location --- Dockerfile | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index b278eda2..dc86a36a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -13,13 +13,14 @@ RUN --mount=type=cache,target=/root/.cache/go-build \ COPY etc/sqlite3 etc/aws-k8s-sa-provider /usr/local/bin/ FROM alpine -ENV AWS_SDK_LOAD_CONFIG=1 +ENV AWS_SDK_LOAD_CONFIG=1 \ + AWS_CONFIG_FILE=/etc/aws-config # for debugging RUN apk add --no-cache sqlite && \ mkdir /root/.aws -COPY etc/aws-config /root/.aws/config +COPY etc/aws-config /etc/ COPY --from=builder \ /usr/local/bin/litestream \ /usr/local/bin/sqlite3 \ From 16d0219aa7b5281794ba5b69de36e99699f78da1 Mon Sep 17 00:00:00 2001 From: Toni Spets Date: Wed, 25 Jan 2023 06:31:20 +0200 Subject: [PATCH 16/16] Add delete command Allows removing a single generation or all files from replica path. --- abs/replica_client.go | 6 ++ cmd/litestream/delete.go | 132 +++++++++++++++++++++++++++++++++++++++ cmd/litestream/main.go | 3 + file/replica_client.go | 12 ++++ gcs/replica_client.go | 6 ++ replica_client.go | 3 + s3/replica_client.go | 2 +- sftp/replica_client.go | 5 ++ 8 files changed, 168 insertions(+), 1 deletion(-) create mode 100644 cmd/litestream/delete.go diff --git a/abs/replica_client.go b/abs/replica_client.go index 4d5e00ef..ff5f8763 100644 --- a/abs/replica_client.go +++ b/abs/replica_client.go @@ -2,6 +2,7 @@ package abs import ( "context" + "errors" "fmt" "io" "net/url" @@ -342,6 +343,11 @@ func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Po return nil } +// DeleteAll deletes everything on the remote path. +func (c *ReplicaClient) DeleteAll(ctx context.Context) (err error) { + return errors.New("DeleteAll stub") +} + type snapshotIterator struct { client *ReplicaClient generation string diff --git a/cmd/litestream/delete.go b/cmd/litestream/delete.go new file mode 100644 index 00000000..d9175bae --- /dev/null +++ b/cmd/litestream/delete.go @@ -0,0 +1,132 @@ +package main + +import ( + "context" + "flag" + "fmt" + "log" + + "github.com/benbjohnson/litestream" +) + +// DeleteCommand represents a command to remove everything from a replica. +type DeleteCommand struct{} + +// Run executes the command. +func (c *DeleteCommand) Run(ctx context.Context, args []string) (err error) { + fs := flag.NewFlagSet("litestream-delete", flag.ContinueOnError) + configPath, noExpandEnv := registerConfigFlag(fs) + replicaName := fs.String("replica", "", "replica name") + generationName := fs.String("generation", "", "generation name") + allFiles := fs.Bool("all-files", false, "remove all files") + fs.Usage = c.Usage + if err := fs.Parse(args); err != nil { + return err + } else if fs.NArg() == 0 || fs.Arg(0) == "" { + return fmt.Errorf("database path or replica URL required") + } else if fs.NArg() > 1 { + return fmt.Errorf("too many arguments") + } + + if *generationName == "" && !*allFiles { + return fmt.Errorf("either generation name or -all-files must be specified") + } + + var db *litestream.DB + var r *litestream.Replica + if isURL(fs.Arg(0)) { + if *configPath != "" { + return fmt.Errorf("cannot specify a replica URL and the -config flag") + } + if r, err = NewReplicaFromConfig(&ReplicaConfig{URL: fs.Arg(0)}, nil); err != nil { + return err + } + } else { + if *configPath == "" { + *configPath = DefaultConfigPath() + } + + // Load configuration. + config, err := ReadConfigFile(*configPath, !*noExpandEnv) + if err != nil { + return err + } + + // Lookup database from configuration file by path. + if path, err := expand(fs.Arg(0)); err != nil { + return err + } else if dbc := config.DBConfig(path); dbc == nil { + return fmt.Errorf("database not found in config: %s", path) + } else if db, err = NewDBFromConfig(dbc); err != nil { + return err + } + + // Filter by replica, if specified. + if *replicaName != "" { + if r = db.Replica(*replicaName); r == nil { + return fmt.Errorf("replica %q not found for database %q", *replicaName, db.Path()) + } + } + } + + var replicas []*litestream.Replica + if r != nil { + replicas = []*litestream.Replica{r} + } else { + replicas = db.Replicas + } + + // Delete everything from each replica. + for _, r := range replicas { + if *allFiles { + log.Printf("%s: removing all files from replica", r.Name()) + + if err := r.Client.DeleteAll(ctx); err != nil { + return err + } + } else if *generationName != "" { + log.Printf("%s: removing generation %s", r.Name(), *generationName) + + if err := r.Client.DeleteGeneration(ctx, *generationName); err != nil { + return err + } + } + } + + return nil +} + +// Usage prints the help message to STDOUT. +func (c *DeleteCommand) Usage() { + fmt.Printf(` +The delete command removes data from replicas. Either a generation or -all-files +must be specified. + +Usage: + + litestream delete [arguments] DB_PATH + + litestream delete [arguments] REPLICA_URL + +Arguments: + + -config PATH + Specifies the configuration file. + Defaults to %s + + -no-expand-env + Disables environment variable expansion in configuration file. + + -replica NAME + Optional, filters by replica. + + -generation NAME + Optional, selects a generation. + + -all-files + Optional, removes everything on replica path recursively. + +`[1:], + DefaultConfigPath(), + ) +} diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index e451746a..b08a6b14 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -124,6 +124,8 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) { case "restore": return (&RestoreCommand{}).Run(ctx, args) + case "delete": + return (&DeleteCommand{}).Run(ctx, args) case "snapshots": return (&SnapshotsCommand{}).Run(ctx, args) case "version": @@ -154,6 +156,7 @@ The commands are: generations list available generations for a database replicate runs a server to replicate databases restore recovers database backup from a replica + delete removes everything from a replica snapshots list available snapshots for a database version prints the binary version wal list available WAL files for a database diff --git a/file/replica_client.go b/file/replica_client.go index 178797af..3369d407 100644 --- a/file/replica_client.go +++ b/file/replica_client.go @@ -379,3 +379,15 @@ func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Po } return nil } + +// DeleteAll deletes everything on the remote path. +func (c *ReplicaClient) DeleteAll(ctx context.Context) error { + if c.Path() == "" || c.Path() == "/" { + return fmt.Errorf("refusing to delete root directory") + } + + if err := os.RemoveAll(c.path); err != nil && !os.IsNotExist(err) { + return err + } + return nil +} diff --git a/gcs/replica_client.go b/gcs/replica_client.go index 7b2b2c67..ff2437bf 100644 --- a/gcs/replica_client.go +++ b/gcs/replica_client.go @@ -2,6 +2,7 @@ package gcs import ( "context" + "errors" "fmt" "io" "os" @@ -308,6 +309,11 @@ func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Po return nil } +// DeleteAll deletes everything on the remote path. +func (c *ReplicaClient) DeleteAll(ctx context.Context) (err error) { + return errors.New("DeleteAll stub") +} + type snapshotIterator struct { generation string diff --git a/replica_client.go b/replica_client.go index 3a914e47..82759663 100644 --- a/replica_client.go +++ b/replica_client.go @@ -45,4 +45,7 @@ type ReplicaClient interface { // index/offset within a generation. Returns an os.ErrNotFound error if the // WAL segment does not exist. WALSegmentReader(ctx context.Context, pos Pos) (io.ReadCloser, error) + + // Deletes everything recursively from replica. + DeleteAll(ctx context.Context) error } diff --git a/s3/replica_client.go b/s3/replica_client.go index 6919eae1..b8128233 100644 --- a/s3/replica_client.go +++ b/s3/replica_client.go @@ -432,7 +432,7 @@ func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Po return nil } -// DeleteAll deletes everything on the remote path. Mainly used for testing. +// DeleteAll deletes everything on the remote path. func (c *ReplicaClient) DeleteAll(ctx context.Context) error { if err := c.Init(ctx); err != nil { return err diff --git a/sftp/replica_client.go b/sftp/replica_client.go index 6b082b4a..56ed942f 100644 --- a/sftp/replica_client.go +++ b/sftp/replica_client.go @@ -493,3 +493,8 @@ func (c *ReplicaClient) resetOnConnError(err error) { c.sshClient = nil } } + +// Deletes everything recursively from replica. +func (c *ReplicaClient) DeleteAll(ctx context.Context) (err error) { + return errors.New("DeleteAll stub") +}