Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for downloading result from s3 #66

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 61 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,17 +1,76 @@
module github.com/uber/athenadriver

go 1.13
go 1.21

toolchain go1.23.0

require (
github.com/DATA-DOG/go-sqlmock v1.4.1
github.com/aws/aws-sdk-go v1.51.3
github.com/cactus/go-statsd-client/statsd v0.0.0-20200423205355-cb0885a1018c
github.com/jedib0t/go-pretty/v6 v6.2.7
github.com/pkg/errors v0.9.1 // indirect
github.com/stretchr/testify v1.7.0
github.com/uber-go/tally v3.3.17+incompatible
github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2
go.uber.org/config v1.4.0
go.uber.org/fx v1.12.0
go.uber.org/zap v1.15.0
)

require (
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/aws/aws-sdk-go-v2 v1.32.2 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.6 // indirect
github.com/aws/aws-sdk-go-v2/config v1.27.43 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.41 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.17 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.21 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.21 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.21 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.2 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.2 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.2 // indirect
github.com/aws/aws-sdk-go-v2/service/s3 v1.65.3 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.24.2 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.2 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.32.2 // indirect
github.com/aws/smithy-go v1.22.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/renameio v0.1.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/jmespath/go-jmespath/internal/testify v1.5.1 // indirect
github.com/kisielk/gotool v1.0.0 // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/kr/pty v1.1.1 // indirect
github.com/kr/text v0.1.0 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pkg/profile v1.6.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/rogpeppe/go-internal v1.3.0 // indirect
github.com/stretchr/objx v0.1.0 // indirect
github.com/yuin/goldmark v1.4.13 // indirect
go.uber.org/atomic v1.6.0 // indirect
go.uber.org/dig v1.9.0 // indirect
go.uber.org/goleak v0.10.0 // indirect
go.uber.org/multierr v1.5.0 // indirect
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/lint v0.0.0-20190930215403-16217165b5de // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/term v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/tools v0.6.0 // indirect
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/errgo.v2 v2.1.0 // indirect
gopkg.in/yaml.v2 v2.2.8 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
honnef.co/go/tools v0.0.1-2019.2.3 // indirect
)
36 changes: 36 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,42 @@ github.com/aws/aws-sdk-go v1.51.3 h1:OqSyEXcJwf/XhZNVpMRgKlLA9nmbo5X8dwbll4RWxq8
github.com/aws/aws-sdk-go v1.51.3/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
github.com/aws/aws-sdk-go v1.54.19 h1:tyWV+07jagrNiCcGRzRhdtVjQs7Vy41NwsuOcl0IbVI=
github.com/aws/aws-sdk-go v1.54.19/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU=
github.com/aws/aws-sdk-go-v2 v1.32.2 h1:AkNLZEyYMLnx/Q/mSKkcMqwNFXMAvFto9bNsHqcTduI=
github.com/aws/aws-sdk-go-v2 v1.32.2/go.mod h1:2SK5n0a2karNTv5tbP1SjsX0uhttou00v/HpXKM1ZUo=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.6 h1:pT3hpW0cOHRJx8Y0DfJUEQuqPild8jRGmSFmBgvydr0=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.6/go.mod h1:j/I2++U0xX+cr44QjHay4Cvxj6FUbnxrgmqN3H1jTZA=
github.com/aws/aws-sdk-go-v2/config v1.27.43 h1:p33fDDihFC390dhhuv8nOmX419wjOSDQRb+USt20RrU=
github.com/aws/aws-sdk-go-v2/config v1.27.43/go.mod h1:pYhbtvg1siOOg8h5an77rXle9tVG8T+BWLWAo7cOukc=
github.com/aws/aws-sdk-go-v2/credentials v1.17.41 h1:7gXo+Axmp+R4Z+AK8YFQO0ZV3L0gizGINCOWxSLY9W8=
github.com/aws/aws-sdk-go-v2/credentials v1.17.41/go.mod h1:u4Eb8d3394YLubphT4jLEwN1rLNq2wFOlT6OuxFwPzU=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.17 h1:TMH3f/SCAWdNtXXVPPu5D6wrr4G5hI1rAxbcocKfC7Q=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.17/go.mod h1:1ZRXLdTpzdJb9fwTMXiLipENRxkGMTn1sfKexGllQCw=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.21 h1:UAsR3xA31QGf79WzpG/ixT9FZvQlh5HY1NRqSHBNOCk=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.21/go.mod h1:JNr43NFf5L9YaG3eKTm7HQzls9J+A9YYcGI5Quh1r2Y=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.21 h1:6jZVETqmYCadGFvrYEQfC5fAQmlo80CeL5psbno6r0s=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.21/go.mod h1:1SR0GbLlnN3QUmYaflZNiH1ql+1qrSiB2vwcJ+4UM60=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.21 h1:7edmS3VOBDhK00b/MwGtGglCm7hhwNYnjJs/PgFdMQE=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.21/go.mod h1:Q9o5h4HoIWG8XfzxqiuK/CGUbepCJ8uTlaE3bAbxytQ=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0 h1:TToQNkvGguu209puTojY/ozlqy2d/SFNcoLIqTFi42g=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0/go.mod h1:0jp+ltwkf+SwG2fm/PKo8t4y8pJSgOCO4D8Lz3k0aHQ=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.2 h1:4FMHqLfk0efmTqhXVRL5xYRqlEBNBiRI7N6w4jsEdd4=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.2/go.mod h1:LWoqeWlK9OZeJxsROW2RqrSPvQHKTpp69r/iDjwsSaw=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.2 h1:s7NA1SOw8q/5c0wr8477yOPp0z+uBaXBnLE0XYb0POA=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.2/go.mod h1:fnjjWyAW/Pj5HYOxl9LJqWtEwS7W2qgcRLWP+uWbss0=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.2 h1:t7iUP9+4wdc5lt3E41huP+GvQZJD38WLsgVp4iOtAjg=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.2/go.mod h1:/niFCtmuQNxqx9v8WAPq5qh7EH25U4BF6tjoyq9bObM=
github.com/aws/aws-sdk-go-v2/service/s3 v1.65.3 h1:xxHGZ+wUgZNACQmxtdvP5tgzfsxGS3vPpTP5Hy3iToE=
github.com/aws/aws-sdk-go-v2/service/s3 v1.65.3/go.mod h1:cB6oAuus7YXRZhWCc1wIwPywwZ1XwweNp2TVAEGYeB8=
github.com/aws/aws-sdk-go-v2/service/sso v1.24.2 h1:bSYXVyUzoTHoKalBmwaZxs97HU9DWWI3ehHSAMa7xOk=
github.com/aws/aws-sdk-go-v2/service/sso v1.24.2/go.mod h1:skMqY7JElusiOUjMJMOv1jJsP7YUg7DrhgqZZWuzu1U=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.2 h1:AhmO1fHINP9vFYUE0LHzCWg/LfUWUF+zFPEcY9QXb7o=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.2/go.mod h1:o8aQygT2+MVP0NaV6kbdE1YnnIM8RRVQzoeUH45GOdI=
github.com/aws/aws-sdk-go-v2/service/sts v1.32.2 h1:CiS7i0+FUe+/YY1GvIBLLrR/XNGZ4CtM1Ll0XavNuVo=
github.com/aws/aws-sdk-go-v2/service/sts v1.32.2/go.mod h1:HtaiBI8CjYoNVde8arShXb94UbQQi9L4EMr6D+xGBwo=
github.com/aws/smithy-go v1.22.0 h1:uunKnWlcoL3zO7q+gG2Pk53joueEOsnNB28QdMsmiMM=
github.com/aws/smithy-go v1.22.0/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/cactus/go-statsd-client/statsd v0.0.0-20200423205355-cb0885a1018c h1:HIGF0r/56+7fuIZw2V4isE22MK6xpxWx7BbV8dJ290w=
github.com/cactus/go-statsd-client/statsd v0.0.0-20200423205355-cb0885a1018c/go.mod h1:l/bIBLeOl9eX+wxJAzxS4TveKRtAqlyDpHjhkfO0MEI=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
56 changes: 55 additions & 1 deletion go/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ package athenadriver
import (
"net/url"
"regexp"
"strings"
"strconv"
"strings"
"time"
)

Expand Down Expand Up @@ -56,6 +56,9 @@ var (
"AWS_REGION",
"AWS_DEFAULT_REGION", // Only read if AWS_SDK_LOAD_CONFIG is also set
}
s3ResultPrefixEnvKey = []string{
"AWS_S3_ATHENA_RESULT_PREFIX",
}
stsRegionalEndpointKey = []string{
"AWS_STS_REGIONAL_ENDPOINTS",
}
Expand Down Expand Up @@ -176,6 +179,57 @@ func (c *Config) GetRegion() string {
return GetFromEnvVal(regionEnvKeys)
}

// SetS3Region is to set S3 region.
func (c *Config) SetS3Region(o string) error {
if len(o) == 0 {
return ErrConfigRegion
}
c.values.Set("s3_region", o)
return nil
}

// GetS3Region is getter of S3 region.
func (c *Config) GetS3Region() string {
if val := c.values.Get("s3_region"); val != "" {
return val
}
return GetFromEnvVal(regionEnvKeys)
}

// SetS3ResultPrefix is to set key prefix for athena result.
func (c *Config) SetS3ResultPrefix(o string) error {
if len(o) == 0 {
return ErrConfigS3ResultPrefix
}
c.values.Set("s3_result_prefix", o)
return nil
}

// GetS3ResultPrefix is getter of key prefix for athena result.
func (c *Config) GetS3ResultPrefix() string {
if val := c.values.Get("s3_result_prefix"); val != "" {
return val
}
return GetFromEnvVal(s3ResultPrefixEnvKey)
}

// SetS3OutputBucket is setter for S3 output bucket
func (c *Config) SetS3OutputBucket(o string) error {
if len(o) == 0 {
return ErrConfigS3OutputBucket
}
c.values.Set("s3_output_bucket", o)
return nil
}

// GetS3OutputBucket is getter of s3 output bucket
func (c *Config) GetS3OutputBucket() string {
if val := c.values.Get("s3_output_bucket"); val != "" {
return val
}
return ""
}

// SetUser is a setter of User.
func (c *Config) SetUser(o string) {
c.dsn.User = url.UserPassword(o, "")
Expand Down
2 changes: 2 additions & 0 deletions go/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ var (
ErrConfigInvalidConfig = errors.New("driver config is invalid")
ErrConfigOutputLocation = errors.New("output location must starts with s3")
ErrConfigRegion = errors.New("region is required")
ErrConfigS3ResultPrefix = errors.New("S3 result prefix is required")
ErrConfigS3OutputBucket = errors.New("S3 output bucket is required")
ErrConfigWGPointer = errors.New("workgroup pointer is nil")
ErrConfigAccessIDRequired = errors.New("AWS access ID is required")
ErrConfigAccessKeyRequired = errors.New("AWS access Key is required")
Expand Down
74 changes: 74 additions & 0 deletions go/rows.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package athenadriver
import (
"context"
"database/sql/driver"
"encoding/csv"
"fmt"
"io"
"strconv"
Expand All @@ -31,6 +32,8 @@ import (

"go.uber.org/zap"

aws_v2_cfg "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go/service/athena/athenaiface"

"github.com/aws/aws-sdk-go/aws"
Expand All @@ -44,6 +47,7 @@ type Rows struct {
queryID string
reachedLastPage bool
ResultOutput *athena.GetQueryResultsOutput
csvReader *csv.Reader
config *Config
tracer *DriverTracer
pageCount int64
Expand Down Expand Up @@ -77,9 +81,61 @@ func NewRows(ctx context.Context, athenaAPI athenaiface.AthenaAPI, queryID strin
if err := r.fetchNextPage(nil); err != nil {
return nil, err
}

if r.ResultOutput.NextToken == nil || *r.ResultOutput.NextToken == "" {
return &r, nil
}
Comment on lines +84 to +86
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An interesting detail is that I fetch that s3 file only if there are multiple pages. If all results are present on the first page, then we don't need to download the S3 file.

I fetch the first page every time because we need metadata information anyway (like types of columns etc.). And doing one extra request is not that problematic for large queries anyway.


csvReader, err := r.DownloadResultFromS3()
if err != nil {
return nil, err
}

// The first line is just a list of columns, we don't need that
csvReader.Read()

r.csvReader = csvReader

return &r, nil
}

func (r *Rows) DownloadResultFromS3() (*csv.Reader, error) {
cfg, err := aws_v2_cfg.LoadDefaultConfig(context.TODO())
if err != nil {
return nil, err
}

cfg.Region = r.config.GetS3Region()

client := s3.NewFromConfig(cfg)

bucket := r.config.GetS3OutputBucket()
path := fmt.Sprintf("%s/%s.csv", r.config.GetS3ResultPrefix(), r.queryID)

output, err := client.GetObject(context.TODO(), &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(path),
})
if err != nil {
return nil, err
}

defer output.Body.Close()

// We serialize the query into a string, I faced some issues when I tried to pipe it into the csv directly
// (I was not able to close Body when csv completed)
var sb strings.Builder
_, err = io.Copy(&sb, output.Body)
if err != nil {
return nil, err
}
resultString := sb.String()

reader := csv.NewReader(strings.NewReader(resultString))

return reader, nil
}

// Columns return Columns metadata.
func (r *Rows) Columns() []string {
var columns []string
Expand All @@ -105,6 +161,24 @@ func (r *Rows) Next(dest []driver.Value) error {
if r.reachedLastPage {
return io.EOF
}

// If there is csvReader available, we should be data from there instead
jan2893 marked this conversation as resolved.
Show resolved Hide resolved
if r.csvReader != nil {
lineData, err := r.csvReader.Read()
if err != nil {
r.reachedLastPage = true
return io.EOF
}

cur := newRow(len(lineData), lineData)
columns := r.ResultOutput.ResultSet.ResultSetMetadata.ColumnInfo
if err := r.convertRow(columns, cur.Data, dest, r.config); err != nil {
return err
}

return nil
}

if len(r.ResultOutput.ResultSet.Rows) == 0 {
if r.ResultOutput.NextToken == nil || *r.ResultOutput.NextToken == "" {
// this means we reach the last page - no token and no rows
Expand Down