Skip to content

Commit

Permalink
Feat: add support for getting result from S3 if there is more than 1 …
Browse files Browse the repository at this point in the history
…page (1000 rows)
  • Loading branch information
jan.kaifer committed Oct 15, 2024
1 parent be26bb1 commit f7772c2
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 3 deletions.
63 changes: 61 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,17 +1,76 @@
module github.com/uber/athenadriver

go 1.13
go 1.21

toolchain go1.23.0

require (
github.com/DATA-DOG/go-sqlmock v1.4.1
github.com/aws/aws-sdk-go v1.51.3
github.com/cactus/go-statsd-client/statsd v0.0.0-20200423205355-cb0885a1018c
github.com/jedib0t/go-pretty/v6 v6.2.7
github.com/pkg/errors v0.9.1 // indirect
github.com/stretchr/testify v1.7.0
github.com/uber-go/tally v3.3.17+incompatible
github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2
go.uber.org/config v1.4.0
go.uber.org/fx v1.12.0
go.uber.org/zap v1.15.0
)

require (
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/aws/aws-sdk-go-v2 v1.32.2 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.6 // indirect
github.com/aws/aws-sdk-go-v2/config v1.27.43 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.41 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.17 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.21 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.21 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.21 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.2 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.2 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.2 // indirect
github.com/aws/aws-sdk-go-v2/service/s3 v1.65.3 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.24.2 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.2 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.32.2 // indirect
github.com/aws/smithy-go v1.22.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/renameio v0.1.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/jmespath/go-jmespath/internal/testify v1.5.1 // indirect
github.com/kisielk/gotool v1.0.0 // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/kr/pty v1.1.1 // indirect
github.com/kr/text v0.1.0 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pkg/profile v1.6.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/rogpeppe/go-internal v1.3.0 // indirect
github.com/stretchr/objx v0.1.0 // indirect
github.com/yuin/goldmark v1.4.13 // indirect
go.uber.org/atomic v1.6.0 // indirect
go.uber.org/dig v1.9.0 // indirect
go.uber.org/goleak v0.10.0 // indirect
go.uber.org/multierr v1.5.0 // indirect
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/lint v0.0.0-20190930215403-16217165b5de // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/term v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/tools v0.6.0 // indirect
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/errgo.v2 v2.1.0 // indirect
gopkg.in/yaml.v2 v2.2.8 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
honnef.co/go/tools v0.0.1-2019.2.3 // indirect
)
36 changes: 36 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,42 @@ github.com/aws/aws-sdk-go v1.51.3 h1:OqSyEXcJwf/XhZNVpMRgKlLA9nmbo5X8dwbll4RWxq8
github.com/aws/aws-sdk-go v1.51.3/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
github.com/aws/aws-sdk-go v1.54.19 h1:tyWV+07jagrNiCcGRzRhdtVjQs7Vy41NwsuOcl0IbVI=
github.com/aws/aws-sdk-go v1.54.19/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU=
github.com/aws/aws-sdk-go-v2 v1.32.2 h1:AkNLZEyYMLnx/Q/mSKkcMqwNFXMAvFto9bNsHqcTduI=
github.com/aws/aws-sdk-go-v2 v1.32.2/go.mod h1:2SK5n0a2karNTv5tbP1SjsX0uhttou00v/HpXKM1ZUo=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.6 h1:pT3hpW0cOHRJx8Y0DfJUEQuqPild8jRGmSFmBgvydr0=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.6/go.mod h1:j/I2++U0xX+cr44QjHay4Cvxj6FUbnxrgmqN3H1jTZA=
github.com/aws/aws-sdk-go-v2/config v1.27.43 h1:p33fDDihFC390dhhuv8nOmX419wjOSDQRb+USt20RrU=
github.com/aws/aws-sdk-go-v2/config v1.27.43/go.mod h1:pYhbtvg1siOOg8h5an77rXle9tVG8T+BWLWAo7cOukc=
github.com/aws/aws-sdk-go-v2/credentials v1.17.41 h1:7gXo+Axmp+R4Z+AK8YFQO0ZV3L0gizGINCOWxSLY9W8=
github.com/aws/aws-sdk-go-v2/credentials v1.17.41/go.mod h1:u4Eb8d3394YLubphT4jLEwN1rLNq2wFOlT6OuxFwPzU=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.17 h1:TMH3f/SCAWdNtXXVPPu5D6wrr4G5hI1rAxbcocKfC7Q=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.17/go.mod h1:1ZRXLdTpzdJb9fwTMXiLipENRxkGMTn1sfKexGllQCw=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.21 h1:UAsR3xA31QGf79WzpG/ixT9FZvQlh5HY1NRqSHBNOCk=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.21/go.mod h1:JNr43NFf5L9YaG3eKTm7HQzls9J+A9YYcGI5Quh1r2Y=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.21 h1:6jZVETqmYCadGFvrYEQfC5fAQmlo80CeL5psbno6r0s=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.21/go.mod h1:1SR0GbLlnN3QUmYaflZNiH1ql+1qrSiB2vwcJ+4UM60=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.21 h1:7edmS3VOBDhK00b/MwGtGglCm7hhwNYnjJs/PgFdMQE=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.21/go.mod h1:Q9o5h4HoIWG8XfzxqiuK/CGUbepCJ8uTlaE3bAbxytQ=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0 h1:TToQNkvGguu209puTojY/ozlqy2d/SFNcoLIqTFi42g=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0/go.mod h1:0jp+ltwkf+SwG2fm/PKo8t4y8pJSgOCO4D8Lz3k0aHQ=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.2 h1:4FMHqLfk0efmTqhXVRL5xYRqlEBNBiRI7N6w4jsEdd4=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.2/go.mod h1:LWoqeWlK9OZeJxsROW2RqrSPvQHKTpp69r/iDjwsSaw=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.2 h1:s7NA1SOw8q/5c0wr8477yOPp0z+uBaXBnLE0XYb0POA=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.2/go.mod h1:fnjjWyAW/Pj5HYOxl9LJqWtEwS7W2qgcRLWP+uWbss0=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.2 h1:t7iUP9+4wdc5lt3E41huP+GvQZJD38WLsgVp4iOtAjg=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.2/go.mod h1:/niFCtmuQNxqx9v8WAPq5qh7EH25U4BF6tjoyq9bObM=
github.com/aws/aws-sdk-go-v2/service/s3 v1.65.3 h1:xxHGZ+wUgZNACQmxtdvP5tgzfsxGS3vPpTP5Hy3iToE=
github.com/aws/aws-sdk-go-v2/service/s3 v1.65.3/go.mod h1:cB6oAuus7YXRZhWCc1wIwPywwZ1XwweNp2TVAEGYeB8=
github.com/aws/aws-sdk-go-v2/service/sso v1.24.2 h1:bSYXVyUzoTHoKalBmwaZxs97HU9DWWI3ehHSAMa7xOk=
github.com/aws/aws-sdk-go-v2/service/sso v1.24.2/go.mod h1:skMqY7JElusiOUjMJMOv1jJsP7YUg7DrhgqZZWuzu1U=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.2 h1:AhmO1fHINP9vFYUE0LHzCWg/LfUWUF+zFPEcY9QXb7o=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.2/go.mod h1:o8aQygT2+MVP0NaV6kbdE1YnnIM8RRVQzoeUH45GOdI=
github.com/aws/aws-sdk-go-v2/service/sts v1.32.2 h1:CiS7i0+FUe+/YY1GvIBLLrR/XNGZ4CtM1Ll0XavNuVo=
github.com/aws/aws-sdk-go-v2/service/sts v1.32.2/go.mod h1:HtaiBI8CjYoNVde8arShXb94UbQQi9L4EMr6D+xGBwo=
github.com/aws/smithy-go v1.22.0 h1:uunKnWlcoL3zO7q+gG2Pk53joueEOsnNB28QdMsmiMM=
github.com/aws/smithy-go v1.22.0/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/cactus/go-statsd-client/statsd v0.0.0-20200423205355-cb0885a1018c h1:HIGF0r/56+7fuIZw2V4isE22MK6xpxWx7BbV8dJ290w=
github.com/cactus/go-statsd-client/statsd v0.0.0-20200423205355-cb0885a1018c/go.mod h1:l/bIBLeOl9eX+wxJAzxS4TveKRtAqlyDpHjhkfO0MEI=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
39 changes: 38 additions & 1 deletion go/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ package athenadriver
import (
"net/url"
"regexp"
"strings"
"strconv"
"strings"
"time"
)

Expand Down Expand Up @@ -56,6 +56,9 @@ var (
"AWS_REGION",
"AWS_DEFAULT_REGION", // Only read if AWS_SDK_LOAD_CONFIG is also set
}
s3ResultPrefixEnvKey = []string{
"AWS_S3_ATHENA_RESULT_PREFIX",
}
stsRegionalEndpointKey = []string{
"AWS_STS_REGIONAL_ENDPOINTS",
}
Expand Down Expand Up @@ -176,6 +179,40 @@ func (c *Config) GetRegion() string {
return GetFromEnvVal(regionEnvKeys)
}

// SetS3Region is to set S3 region.
func (c *Config) SetS3Region(o string) error {
if len(o) == 0 {
return ErrConfigRegion
}
c.values.Set("s3_region", o)
return nil
}

// GetS3Region is getter of S3 region.
func (c *Config) GetS3Region() string {
if val := c.values.Get("s3_region"); val != "" {
return val
}
return GetFromEnvVal(regionEnvKeys)
}

// SetS3ResultPrefix is to set key prefix for athena result.
func (c *Config) SetS3ResultPrefix(o string) error {
if len(o) == 0 {
return ErrConfigS3ResultPrefix
}
c.values.Set("s3_result_prefix", o)
return nil
}

// GetS3ResultPrefix is getter of key prefix for athena result.
func (c *Config) GetS3ResultPrefix() string {
if val := c.values.Get("s3_result_prefix"); val != "" {
return val
}
return GetFromEnvVal(s3ResultPrefixEnvKey)
}

// SetUser is a setter of User.
func (c *Config) SetUser(o string) {
c.dsn.User = url.UserPassword(o, "")
Expand Down
1 change: 1 addition & 0 deletions go/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (
ErrConfigInvalidConfig = errors.New("driver config is invalid")
ErrConfigOutputLocation = errors.New("output location must starts with s3")
ErrConfigRegion = errors.New("region is required")
ErrConfigS3ResultPrefix = errors.New("S3 result prefix is required")
ErrConfigWGPointer = errors.New("workgroup pointer is nil")
ErrConfigAccessIDRequired = errors.New("AWS access ID is required")
ErrConfigAccessKeyRequired = errors.New("AWS access Key is required")
Expand Down
74 changes: 74 additions & 0 deletions go/rows.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package athenadriver
import (
"context"
"database/sql/driver"
"encoding/csv"
"fmt"
"io"
"strconv"
Expand All @@ -31,6 +32,8 @@ import (

"go.uber.org/zap"

aws_v2_cfg "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go/service/athena/athenaiface"

"github.com/aws/aws-sdk-go/aws"
Expand All @@ -44,6 +47,7 @@ type Rows struct {
queryID string
reachedLastPage bool
ResultOutput *athena.GetQueryResultsOutput
csvReader *csv.Reader
config *Config
tracer *DriverTracer
pageCount int64
Expand Down Expand Up @@ -77,9 +81,61 @@ func NewRows(ctx context.Context, athenaAPI athenaiface.AthenaAPI, queryID strin
if err := r.fetchNextPage(nil); err != nil {
return nil, err
}

if r.ResultOutput.NextToken == nil || *r.ResultOutput.NextToken == "" {
return &r, nil
}

csvReader, err := r.DownloadResultFromS3()
if err != nil {
return nil, err
}

// The first line is just a list of columns, we don't need that
csvReader.Read()

r.csvReader = csvReader

return &r, nil
}

func (r *Rows) DownloadResultFromS3() (*csv.Reader, error) {
cfg, err := aws_v2_cfg.LoadDefaultConfig(context.TODO())
if err != nil {
return nil, err
}

cfg.Region = r.config.GetS3Region()

client := s3.NewFromConfig(cfg)

bucket := r.config.GetOutputBucket()
path := fmt.Sprintf("%s/%s.csv", r.config.GetS3ResultPrefix(), r.queryID)

output, err := client.GetObject(context.TODO(), &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(path),
})
if err != nil {
return nil, err
}

defer output.Body.Close()

// We serialize the query into a string, I faced some issues when I tried to pipe it into the csv directly
// (I was not able to close Body when csv completed)
var sb strings.Builder
_, err = io.Copy(&sb, output.Body)
if err != nil {
return nil, err
}
resultString := sb.String()

reader := csv.NewReader(strings.NewReader(resultString))

return reader, nil
}

// Columns return Columns metadata.
func (r *Rows) Columns() []string {
var columns []string
Expand All @@ -105,6 +161,24 @@ func (r *Rows) Next(dest []driver.Value) error {
if r.reachedLastPage {
return io.EOF
}

// If there is csvReader available, we should be data from there instead
if r.csvReader != nil {
lineData, err := r.csvReader.Read()
if err != nil {
r.reachedLastPage = true
return io.EOF
}

cur := newRow(len(lineData), lineData)
columns := r.ResultOutput.ResultSet.ResultSetMetadata.ColumnInfo
if err := r.convertRow(columns, cur.Data, dest, r.config); err != nil {
return err
}

return nil
}

if len(r.ResultOutput.ResultSet.Rows) == 0 {
if r.ResultOutput.NextToken == nil || *r.ResultOutput.NextToken == "" {
// this means we reach the last page - no token and no rows
Expand Down

0 comments on commit f7772c2

Please sign in to comment.