Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configurable auto-checksum #1990

Merged
merged 2 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions api-put-object-multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"encoding/hex"
"encoding/xml"
"fmt"
"hash/crc32"
"io"
"net/http"
"net/url"
Expand Down Expand Up @@ -87,7 +86,7 @@ func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, obj
if opts.UserMetadata == nil {
opts.UserMetadata = make(map[string]string, 1)
}
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C"
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = opts.AutoChecksum.String()
}

// Initiate a new multipart upload.
Expand Down Expand Up @@ -116,7 +115,7 @@ func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, obj
// CRC32C is ~50% faster on AMD64 @ 30GB/s
var crcBytes []byte
customHeader := make(http.Header)
crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
crc := opts.AutoChecksum.Hasher()
for partNumber <= totalPartsCount {
length, rErr := readFull(reader, buf)
if rErr == io.EOF && partNumber > 1 {
Expand Down Expand Up @@ -154,7 +153,7 @@ func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, obj
crc.Reset()
crc.Write(buf[:length])
cSum := crc.Sum(nil)
customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum))
customHeader.Set(opts.AutoChecksum.Key(), base64.StdEncoding.EncodeToString(cSum))
crcBytes = append(crcBytes, cSum...)
}

Expand Down Expand Up @@ -202,12 +201,13 @@ func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, obj
sort.Sort(completedParts(complMultipartUpload.Parts))
opts = PutObjectOptions{
ServerSideEncryption: opts.ServerSideEncryption,
AutoChecksum: opts.AutoChecksum,
}
if len(crcBytes) > 0 {
// Add hash of hashes.
crc.Reset()
crc.Write(crcBytes)
opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))}
opts.UserMetadata = map[string]string{opts.AutoChecksum.Key(): base64.StdEncoding.EncodeToString(crc.Sum(nil))}
}
uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
if err != nil {
Expand Down
39 changes: 22 additions & 17 deletions api-put-object-streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"context"
"encoding/base64"
"fmt"
"hash/crc32"
"io"
"net/http"
"net/url"
Expand Down Expand Up @@ -115,7 +114,7 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN
if opts.UserMetadata == nil {
opts.UserMetadata = make(map[string]string, 1)
}
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C"
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = opts.AutoChecksum.String()
}
// Initiate a new multipart upload.
uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
Expand Down Expand Up @@ -195,10 +194,10 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN
sectionReader := newHook(io.NewSectionReader(reader, readOffset, partSize), opts.Progress)
trailer := make(http.Header, 1)
if withChecksum {
crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
trailer.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(crc.Sum(nil)))
crc := opts.AutoChecksum.Hasher()
trailer.Set(opts.AutoChecksum.Key(), base64.StdEncoding.EncodeToString(crc.Sum(nil)))
sectionReader = newHashReaderWrapper(sectionReader, crc, func(hash []byte) {
trailer.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(hash))
trailer.Set(opts.AutoChecksum.Key(), base64.StdEncoding.EncodeToString(hash))
})
}

Expand Down Expand Up @@ -271,17 +270,18 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN

opts = PutObjectOptions{
ServerSideEncryption: opts.ServerSideEncryption,
AutoChecksum: opts.AutoChecksum,
}
if withChecksum {
// Add hash of hashes.
crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
crc := opts.AutoChecksum.Hasher()
for _, part := range complMultipartUpload.Parts {
cs, err := base64.StdEncoding.DecodeString(part.ChecksumCRC32C)
cs, err := base64.StdEncoding.DecodeString(part.Checksum(opts.AutoChecksum))
if err == nil {
crc.Write(cs)
}
}
opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))}
opts.UserMetadata = map[string]string{opts.AutoChecksum.KeyCapitalized(): base64.StdEncoding.EncodeToString(crc.Sum(nil))}
}

uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
Expand All @@ -308,7 +308,7 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b
if opts.UserMetadata == nil {
opts.UserMetadata = make(map[string]string, 1)
}
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C"
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = opts.AutoChecksum.String()
}

// Calculate the optimal parts info for a given size.
Expand Down Expand Up @@ -337,7 +337,7 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b
// CRC32C is ~50% faster on AMD64 @ 30GB/s
var crcBytes []byte
customHeader := make(http.Header)
crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
crc := opts.AutoChecksum.Hasher()
md5Hash := c.md5Hasher()
defer md5Hash.Close()

Expand Down Expand Up @@ -381,7 +381,7 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b
crc.Reset()
crc.Write(buf[:length])
cSum := crc.Sum(nil)
customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum))
customHeader.Set(opts.AutoChecksum.KeyCapitalized(), base64.StdEncoding.EncodeToString(cSum))
crcBytes = append(crcBytes, cSum...)
}

Expand Down Expand Up @@ -433,12 +433,13 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b

opts = PutObjectOptions{
ServerSideEncryption: opts.ServerSideEncryption,
AutoChecksum: opts.AutoChecksum,
}
if len(crcBytes) > 0 {
// Add hash of hashes.
crc.Reset()
crc.Write(crcBytes)
opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))}
opts.UserMetadata = map[string]string{opts.AutoChecksum.KeyCapitalized(): base64.StdEncoding.EncodeToString(crc.Sum(nil))}
}
uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
if err != nil {
Expand Down Expand Up @@ -467,7 +468,7 @@ func (c *Client) putObjectMultipartStreamParallel(ctx context.Context, bucketNam
if opts.UserMetadata == nil {
opts.UserMetadata = make(map[string]string, 1)
}
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C"
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = opts.AutoChecksum.String()
}

// Cancel all when an error occurs.
Expand Down Expand Up @@ -500,7 +501,7 @@ func (c *Client) putObjectMultipartStreamParallel(ctx context.Context, bucketNam
// Create checksums
// CRC32C is ~50% faster on AMD64 @ 30GB/s
var crcBytes []byte
crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
crc := opts.AutoChecksum.Hasher()

// Total data read and written to server. should be equal to 'size' at the end of the call.
var totalUploadedSize int64
Expand Down Expand Up @@ -558,7 +559,7 @@ func (c *Client) putObjectMultipartStreamParallel(ctx context.Context, bucketNam
crc.Reset()
crc.Write(buf[:length])
cSum := crc.Sum(nil)
customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum))
customHeader.Set(opts.AutoChecksum.Key(), base64.StdEncoding.EncodeToString(cSum))
crcBytes = append(crcBytes, cSum...)
}

Expand Down Expand Up @@ -639,12 +640,13 @@ func (c *Client) putObjectMultipartStreamParallel(ctx context.Context, bucketNam

opts = PutObjectOptions{
ServerSideEncryption: opts.ServerSideEncryption,
AutoChecksum: opts.AutoChecksum,
}
if len(crcBytes) > 0 {
// Add hash of hashes.
crc.Reset()
crc.Write(crcBytes)
opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))}
opts.UserMetadata = map[string]string{opts.AutoChecksum.KeyCapitalized(): base64.StdEncoding.EncodeToString(crc.Sum(nil))}
}
uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
if err != nil {
Expand Down Expand Up @@ -765,7 +767,10 @@ func (c *Client) putObjectDo(ctx context.Context, bucketName, objectName string,
contentMD5Base64: md5Base64,
contentSHA256Hex: sha256Hex,
streamSha256: !opts.DisableContentSha256,
addCrc: addCrc,
}
if addCrc {
opts.AutoChecksum.SetDefault(ChecksumCRC32C)
reqMetadata.addCrc = &opts.AutoChecksum
}
if opts.Internal.SourceVersionID != "" {
if opts.Internal.SourceVersionID != nullVersionID {
Expand Down
16 changes: 11 additions & 5 deletions api-put-object.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"encoding/base64"
"errors"
"fmt"
"hash/crc32"
"io"
"net/http"
"sort"
Expand Down Expand Up @@ -90,6 +89,11 @@ type PutObjectOptions struct {
DisableContentSha256 bool
DisableMultipart bool

// AutoChecksum is the type of checksum that will be added if no other checksum is added,
// like MD5 or SHA256 streaming checksum, and it is feasible for the upload type.
// If none is specified CRC32C is used, since it is generally the fastest.
AutoChecksum ChecksumType

// ConcurrentStreamParts will create NumThreads buffers of PartSize bytes,
// fill them serially and upload them in parallel.
// This can be used for faster uploads on non-seekable or slow-to-seek input.
Expand Down Expand Up @@ -300,6 +304,7 @@ func (c *Client) putObjectCommon(ctx context.Context, bucketName, objectName str
if size > int64(maxMultipartPutObjectSize) {
return UploadInfo{}, errEntityTooLarge(size, maxMultipartPutObjectSize, bucketName, objectName)
}
opts.AutoChecksum.SetDefault(ChecksumCRC32C)

// NOTE: Streaming signature is not supported by GCS.
if s3utils.IsGoogleEndpoint(*c.endpointURL) {
Expand Down Expand Up @@ -361,7 +366,7 @@ func (c *Client) putObjectMultipartStreamNoLength(ctx context.Context, bucketNam
if opts.UserMetadata == nil {
opts.UserMetadata = make(map[string]string, 1)
}
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C"
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = opts.AutoChecksum.String()
}

// Initiate a new multipart upload.
Expand Down Expand Up @@ -390,7 +395,7 @@ func (c *Client) putObjectMultipartStreamNoLength(ctx context.Context, bucketNam
// CRC32C is ~50% faster on AMD64 @ 30GB/s
var crcBytes []byte
customHeader := make(http.Header)
crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
crc := opts.AutoChecksum.Hasher()

for partNumber <= totalPartsCount {
length, rerr := readFull(reader, buf)
Expand All @@ -413,7 +418,7 @@ func (c *Client) putObjectMultipartStreamNoLength(ctx context.Context, bucketNam
crc.Reset()
crc.Write(buf[:length])
cSum := crc.Sum(nil)
customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum))
customHeader.Set(opts.AutoChecksum.Key(), base64.StdEncoding.EncodeToString(cSum))
crcBytes = append(crcBytes, cSum...)
}

Expand Down Expand Up @@ -466,12 +471,13 @@ func (c *Client) putObjectMultipartStreamNoLength(ctx context.Context, bucketNam

opts = PutObjectOptions{
ServerSideEncryption: opts.ServerSideEncryption,
AutoChecksum: opts.AutoChecksum,
}
if len(crcBytes) > 0 {
// Add hash of hashes.
crc.Reset()
crc.Write(crcBytes)
opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))}
opts.UserMetadata = map[string]string{opts.AutoChecksum.KeyCapitalized(): base64.StdEncoding.EncodeToString(crc.Sum(nil))}
}
uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
if err != nil {
Expand Down
16 changes: 16 additions & 0 deletions api-s3-datatypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,22 @@ type CompletePart struct {
ChecksumSHA256 string `xml:"ChecksumSHA256,omitempty"`
}

// Checksum will return the checksum for the given type.
// Will return the empty string if not set.
func (c CompletePart) Checksum(t ChecksumType) string {
switch {
case t.Is(ChecksumCRC32C):
return c.ChecksumCRC32C
case t.Is(ChecksumCRC32):
return c.ChecksumCRC32
case t.Is(ChecksumSHA1):
return c.ChecksumSHA1
case t.Is(ChecksumSHA256):
return c.ChecksumSHA256
}
return ""
}

// completeMultipartUpload container for completing multipart upload.
type completeMultipartUpload struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CompleteMultipartUpload" json:"-"`
Expand Down
11 changes: 5 additions & 6 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"encoding/base64"
"errors"
"fmt"
"hash/crc32"
"io"
"math/rand"
"net"
Expand Down Expand Up @@ -471,7 +470,7 @@ type requestMetadata struct {
contentMD5Base64 string // carries base64 encoded md5sum
contentSHA256Hex string // carries hex encoded sha256sum
streamSha256 bool
addCrc bool
addCrc *ChecksumType
trailer http.Header // (http.Request).Trailer. Requires v4 signature.
}

Expand Down Expand Up @@ -616,16 +615,16 @@ func (c *Client) executeMethod(ctx context.Context, method string, metadata requ
}
}

if metadata.addCrc && metadata.contentLength > 0 {
if metadata.addCrc != nil && metadata.contentLength > 0 {
if metadata.trailer == nil {
metadata.trailer = make(http.Header, 1)
}
crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
crc := metadata.addCrc.Hasher()
metadata.contentBody = newHashReaderWrapper(metadata.contentBody, crc, func(hash []byte) {
// Update trailer when done.
metadata.trailer.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(hash))
metadata.trailer.Set(metadata.addCrc.Key(), base64.StdEncoding.EncodeToString(hash))
})
metadata.trailer.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(crc.Sum(nil)))
metadata.trailer.Set(metadata.addCrc.Key(), base64.StdEncoding.EncodeToString(crc.Sum(nil)))
}

// Create cancel context to control 'newRetryTimer' go routine.
Expand Down
13 changes: 13 additions & 0 deletions checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"hash/crc32"
"io"
"math/bits"
"net/http"
)

// ChecksumType contains information about the checksum type.
Expand Down Expand Up @@ -78,6 +79,11 @@ func (c ChecksumType) Key() string {
return ""
}

// KeyCapitalized returns the capitalized key as used in HTTP headers.
func (c ChecksumType) KeyCapitalized() string {
return http.CanonicalHeaderKey(c.Key())
}

// RawByteLen returns the size of the un-encoded checksum.
func (c ChecksumType) RawByteLen() int {
switch c & checksumMask {
Expand Down Expand Up @@ -112,6 +118,13 @@ func (c ChecksumType) IsSet() bool {
return bits.OnesCount32(uint32(c)) == 1
}

// SetDefault will set the checksum if not already set.
func (c *ChecksumType) SetDefault(t ChecksumType) {
if !c.IsSet() {
*c = t
}
}

// String returns the type as a string.
// CRC32, CRC32C, SHA1, and SHA256 for valid values.
// Empty string for unset and "<invalid>" if not valid.
Expand Down
Loading
Loading