Skip to content

Commit

Permalink
Implement PARTITION and TRY PARTITIONED QUERY (#46)
Browse files Browse the repository at this point in the history
* Implement PARTITION and TRY PARTITIONED QUERY

* Update README.md
  • Loading branch information
apstndb authored Nov 21, 2024
1 parent ad63cc4 commit ea7fdbf
Show file tree
Hide file tree
Showing 4 changed files with 224 additions and 47 deletions.
43 changes: 43 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ You can control your Spanner databases with idiomatic SQL commands.
* Can use embedded emulator (`--embedded-emulator`)
* Support [query parameters](#query-parameter-support)
* gRPC logging(`--log-grpc`)
* Test root-partitionable with [`TRY PARTITIONED QUERY <sql>` command](#test-root-partitionable)
* Respects batch use cases as well as interactive use cases
* More `gcloud spanner databases execute-sql` compatibilities
* Support compatible flags (`--sql`, `--query-mode`)
Expand Down Expand Up @@ -277,6 +278,8 @@ and `{}` for a mutually exclusive keyword.
| Rollback Read-Write Transaction | `ROLLBACK;` | |
| Start Read-Only Transaction | `BEGIN RO [{<seconds>\|<RFC3339-formatted time>}] [PRIORITY {HIGH\|MEDIUM\|LOW}] [TAG <tag>];` | `<seconds>` and `<RFC3339-formatted time>` is used for stale read. See [Request Priority](#request-priority) for details on the priority. The tag you set is used as request tag. See also [Transaction Tags and Request Tags](#transaction-tags-and-request-tags).|
| End Read-Only Transaction | `CLOSE;` | |
| Test root-partitionable | `TRY PARTITIONED QUERY <sql>` ||
| Show partition tokens of partition query | `PARTITION <sql>` ||
| Exit CLI | `EXIT;` | |
| Show variable | `SHOW VARIABLE <name>;` | |
| Set variable | `SET <name> = <value>;` | |
Expand Down Expand Up @@ -736,6 +739,46 @@ Empty set (0.00 sec)
Empty set (0.00 sec)
```

## Partition Queries

spanner-mycli have some partition queries functionality.

### Test root-partitionable

You can test whether the query is root-partitionable using `TRY PARTITIONED QUERY` command.

```
spanner> TRY PARTITIONED QUERY SELECT * FROM Singers;
+--------------------+
| Root_Partitionable |
+--------------------+
| TRUE |
+--------------------+
1 rows in set (0.78 sec)
spanner> TRY PARTITIONED QUERY SELECT * FROM Singers ORDER BY SingerId;
ERROR: query can't be a partition query: rpc error: code = InvalidArgument desc = Query is not root partitionable since it does not have a DistributedUnion at the root. Please check the conditions for a query to be root-partitionable.
error details: name = Help desc = Conditions for a query to be root-partitionable. url = https://cloud.google.com/spanner/docs/reads#read_data_in_parallel
```

### Show partition tokens.

You can show partition tokens using `PARTITION` command.

Note: spanner-mycli does not clean up batch read-only transactions, which may prevent resources from being freed until they time out.

```
spanner> PARTITION SELECT * FROM Singers;
+-------------------------------------------------------------------------------------------------+
| Partition_Token |
+-------------------------------------------------------------------------------------------------+
| QUw0MGxyRjJDZENocXc0TkZnR3NxVHN1QnFCMy1yWkxWZmlIdFhhc2U4T2lWOGJRVFhJRkgydU1URmZUb2dBLXVvZE1O... |
| QUw0MGxyRzhPdmZUR04xclFQVTJKQXlVMEJjZFBUWTV3NUFsT2x0VGxTNHZWSExsejQweXNUWUFtUXd5ZjBzWEhmQ0Fa... |
| QUw0MGxyR3paSk96WUNqdjFsQW9tc2UwOFFoNlA4SzhHUzNWQVltNzVlRHZxdjZpUmFVSFN2UmtBanozc0hEaE9Iem9x... |
+-------------------------------------------------------------------------------------------------+
3 rows in set (0.65 sec)
```

## Using with the Cloud Spanner Emulator

This tool supports the [Cloud Spanner Emulator](https://cloud.google.com/spanner/docs/emulator) via the [`SPANNER_EMULATOR_HOST` environment variable](https://cloud.google.com/spanner/docs/emulator#client-libraries).
Expand Down
58 changes: 29 additions & 29 deletions cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,11 +574,12 @@ func maxIndex(ignoreMax int, adjustWidths []int, seq iter.Seq[WidthCount]) (int,
hiter.Pairs(slices.Values(adjustWidths), seq)))
}

func calculateOptimalWidth(debug bool, screenWidth int, types []*sppb.StructType_Field, rows []Row) []int {
func calculateOptimalWidth(debug bool, screenWidth int, header []string, rows []Row) []int {

// table overhead is:
// len(`| |`) +
// len(` | `) * len(columns) - 1
overheadWidth := 4 + 3*(len(types)-1)
overheadWidth := 4 + 3*(len(header)-1)

// don't mutate
termWidthWithoutOverhead := screenWidth - overheadWidth
Expand All @@ -591,23 +592,21 @@ func calculateOptimalWidth(debug bool, screenWidth int, types []*sppb.StructType
return fmt.Sprintf("remaining %v, adjustedWidths: %v", remainsWidth-lo.Sum(adjustedWidths), adjustedWidths)
}

adjustedWidths := adjustByName(types, termWidthWithoutOverhead)
adjustedWidths := adjustByHeader(header, termWidthWithoutOverhead)

if debug {
log.Println("adjustByName:", formatIntermediate(termWidthWithoutOverhead, adjustedWidths))
}

var transposedRows [][]string
for columnIdx := range len(types) {
for columnIdx := range len(header) {
transposedRows = append(transposedRows, slices.Collect(
xiter.Concat(
hiter.Once(formatTypedHeaderColumn(types[columnIdx])),
xiter.Map(
func(in Row) string {
return lo.Must(lo.Nth(in.Columns, columnIdx))
},
slices.Values(rows),
))))
xiter.Map(
func(in Row) string {
return lo.Must(lo.Nth(in.Columns, columnIdx))
},
xiter.Concat(hiter.Once(toRow(header...)), slices.Values(rows)),
)))
}

widthCounts := calculateWidthCounts(adjustedWidths, transposedRows)
Expand Down Expand Up @@ -714,12 +713,8 @@ type WidthCount struct{ width, count int }
func (wc WidthCount) Length() int { return wc.width }
func (wc WidthCount) Count() int { return wc.count }

func adjustByName(types []*sppb.StructType_Field, availableWidth int) []int {
names := slices.Collect(xiter.Map(
(*sppb.StructType_Field).GetName,
slices.Values(types),
))
nameWidths := slices.Collect(xiter.Map(runewidth.StringWidth, slices.Values(names)))
func adjustByHeader(headers []string, availableWidth int) []int {
nameWidths := slices.Collect(xiter.Map(runewidth.StringWidth, slices.Values(headers)))

adjustWidths, _ := adjustToSum(availableWidth, nameWidths)

Expand All @@ -739,8 +734,17 @@ func printResult(debug bool, screenWidth int, out io.Writer, result *Result, mod
table.SetAlignment(tablewriter.ALIGN_LEFT)
table.SetAutoWrapText(false)

// It is not valid when ColumnType is not populated.
adjustedWidths := calculateOptimalWidth(debug, screenWidth, result.ColumnTypes, result.Rows)
var adjustedWidths []int
if len(result.ColumnTypes) > 0 {
names := slices.Collect(xiter.Map(
(*sppb.StructType_Field).GetName,
slices.Values(result.ColumnTypes),
))
header := slices.Collect(xiter.Map(formatTypedHeaderColumn, slices.Values(result.ColumnTypes)))
adjustedWidths = calculateOptimalWidth(debug, screenWidth, names, slices.Concat(sliceOf(toRow(header...)), result.Rows))
} else {
adjustedWidths = calculateOptimalWidth(debug, screenWidth, result.ColumnNames, slices.Concat(sliceOf(toRow(result.ColumnNames...)), result.Rows))
}

// This condition is true if statement is SelectStatement or DmlStatement
var forceTableRender bool
Expand All @@ -759,15 +763,11 @@ func printResult(debug bool, screenWidth int, out io.Writer, result *Result, mod
}

for _, row := range result.Rows {
if len(result.ColumnTypes) > 0 {
wrappedColumns := slices.Collect(hiter.Unify(
runewidth.Wrap,
hiter.Pairs(slices.Values(row.Columns), slices.Values(adjustedWidths))),
)
table.Append(wrappedColumns)
} else {
table.Append(row.Columns)
}
wrappedColumns := slices.Collect(hiter.Unify(
runewidth.Wrap,
hiter.Pairs(slices.Values(row.Columns), slices.Values(adjustedWidths))),
)
table.Append(wrappedColumns)
}

if forceTableRender || len(result.Rows) > 0 {
Expand Down
150 changes: 132 additions & 18 deletions statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package main

import (
"context"
"encoding/base64"
"errors"
"fmt"
"log"
Expand All @@ -28,6 +29,8 @@ import (
"strings"
"time"

"github.com/ngicks/go-iterator-helper/x/exp/xiter"

"cloud.google.com/go/spanner"
adminpb "cloud.google.com/go/spanner/admin/database/apiv1/databasepb"
sppb "cloud.google.com/go/spanner/apiv1/spannerpb"
Expand Down Expand Up @@ -74,6 +77,7 @@ type Result struct {

// ColumnTypes will be printed in `--verbose` mode if it is not empty
ColumnTypes []*sppb.StructType_Field
ForceWrap bool
}

type Row struct {
Expand Down Expand Up @@ -116,24 +120,28 @@ var (
closeRe = regexp.MustCompile(`(?is)^CLOSE$`)

// Other
exitRe = regexp.MustCompile(`(?is)^EXIT$`)
useRe = regexp.MustCompile(`(?is)^USE\s+([^\s]+)(?:\s+ROLE\s+(.+))?$`)
showLocalProtoRe = regexp.MustCompile(`(?is)^SHOW\s+LOCAL\s+PROTO$`)
showRemoteProtoRe = regexp.MustCompile(`(?is)^SHOW\s+REMOTE\s+PROTO$`)
showDatabasesRe = regexp.MustCompile(`(?is)^SHOW\s+DATABASES$`)
showCreateTableRe = regexp.MustCompile(`(?is)^SHOW\s+CREATE\s+TABLE\s+(.+)$`)
showTablesRe = regexp.MustCompile(`(?is)^SHOW\s+TABLES(?:\s+(.+))?$`)
showColumnsRe = regexp.MustCompile(`(?is)^(?:SHOW\s+COLUMNS\s+FROM)\s+(.+)$`)
showIndexRe = regexp.MustCompile(`(?is)^SHOW\s+(?:INDEX|INDEXES|KEYS)\s+FROM\s+(.+)$`)
explainRe = regexp.MustCompile(`(?is)^EXPLAIN\s+(ANALYZE\s+)?(.+)$`)
describeRe = regexp.MustCompile(`(?is)^DESCRIBE\s+(.+)$`)
showVariableRe = regexp.MustCompile(`(?is)^SHOW\s+VARIABLE\s+(.+)$`)
setParamTypeRe = regexp.MustCompile(`(?is)^SET\s+PARAM\s+([^\s=]+)\s*([^=]*)$`)
setParamRe = regexp.MustCompile(`(?is)^SET\s+PARAM\s+([^\s=]+)\s*=\s*(.*)$`)
setRe = regexp.MustCompile(`(?is)^SET\s+([^\s=]+)\s*=\s*(\S.*)$`)
setAddRe = regexp.MustCompile(`(?is)^SET\s+([^\s+=]+)\s*\+=\s*(\S.*)$`)
showParamsRe = regexp.MustCompile(`(?is)^SHOW\s+PARAMS$`)
showVariablesRe = regexp.MustCompile(`(?is)^SHOW\s+VARIABLES$`)
exitRe = regexp.MustCompile(`(?is)^EXIT$`)
useRe = regexp.MustCompile(`(?is)^USE\s+([^\s]+)(?:\s+ROLE\s+(.+))?$`)
showLocalProtoRe = regexp.MustCompile(`(?is)^SHOW\s+LOCAL\s+PROTO$`)
showRemoteProtoRe = regexp.MustCompile(`(?is)^SHOW\s+REMOTE\s+PROTO$`)
showDatabasesRe = regexp.MustCompile(`(?is)^SHOW\s+DATABASES$`)
showCreateTableRe = regexp.MustCompile(`(?is)^SHOW\s+CREATE\s+TABLE\s+(.+)$`)
showTablesRe = regexp.MustCompile(`(?is)^SHOW\s+TABLES(?:\s+(.+))?$`)
showColumnsRe = regexp.MustCompile(`(?is)^(?:SHOW\s+COLUMNS\s+FROM)\s+(.+)$`)
showIndexRe = regexp.MustCompile(`(?is)^SHOW\s+(?:INDEX|INDEXES|KEYS)\s+FROM\s+(.+)$`)
explainRe = regexp.MustCompile(`(?is)^EXPLAIN\s+(ANALYZE\s+)?(.+)$`)
describeRe = regexp.MustCompile(`(?is)^DESCRIBE\s+(.+)$`)
showVariableRe = regexp.MustCompile(`(?is)^SHOW\s+VARIABLE\s+(.+)$`)
setParamTypeRe = regexp.MustCompile(`(?is)^SET\s+PARAM\s+([^\s=]+)\s*([^=]*)$`)
setParamRe = regexp.MustCompile(`(?is)^SET\s+PARAM\s+([^\s=]+)\s*=\s*(.*)$`)
setRe = regexp.MustCompile(`(?is)^SET\s+([^\s=]+)\s*=\s*(\S.*)$`)
setAddRe = regexp.MustCompile(`(?is)^SET\s+([^\s+=]+)\s*\+=\s*(\S.*)$`)
showParamsRe = regexp.MustCompile(`(?is)^SHOW\s+PARAMS$`)
showVariablesRe = regexp.MustCompile(`(?is)^SHOW\s+VARIABLES$`)
partitionRe = regexp.MustCompile(`(?is)^PARTITION\s(\S.*)$`)
runPartitionedQueryRe = regexp.MustCompile(`(?is)^RUN\s+PARTITIONED\s+QUERY\s(\S.*)$`)
runPartitionRe = regexp.MustCompile(`(?is)^RUN\s+PARTITION\s+('[^']*'|"[^"]*")$`)
tryPartitionedQueryRe = regexp.MustCompile(`(?is)^TRY\s+PARTITIONED\s+QUERY\s(\S.*)$`)
)

var (
Expand Down Expand Up @@ -236,6 +244,18 @@ func BuildCLIStatement(trimmed string) (Statement, error) {
return &ShowParamsStatement{}, nil
case showVariablesRe.MatchString(trimmed):
return &ShowVariablesStatement{}, nil
case partitionRe.MatchString(trimmed):
matched := partitionRe.FindStringSubmatch(trimmed)
return &PartitionStatement{SQL: matched[1]}, nil
case runPartitionedQueryRe.MatchString(trimmed):
matched := runPartitionedQueryRe.FindStringSubmatch(trimmed)
return &RunPartitionedQueryStatement{SQL: matched[1]}, nil
case runPartitionRe.MatchString(trimmed):
matched := runPartitionRe.FindStringSubmatch(trimmed)
return &RunPartitionStatement{Token: unquoteString(matched[1])}, nil
case tryPartitionedQueryRe.MatchString(trimmed):
matched := tryPartitionedQueryRe.FindStringSubmatch(trimmed)
return &TryPartitionedQueryStatement{SQL: matched[1]}, nil
default:
return nil, errStatementNotMatched
}
Expand Down Expand Up @@ -1060,6 +1080,100 @@ func (s *CloseStatement) Execute(ctx context.Context, session *Session) (*Result
return result, nil
}

type PartitionStatement struct{ SQL string }

func (s *PartitionStatement) Execute(ctx context.Context, session *Session) (*Result, error) {
stmt, err := newStatement(s.SQL, session.systemVariables.Params, false)
if err != nil {
return nil, err
}

partitions, batchROTx, err := session.RunPartitionQuery(ctx, stmt)
if err != nil {
return nil, err
}

rows := slices.Collect(xiter.Map(
func(partition *spanner.Partition) Row {
return toRow(base64.StdEncoding.EncodeToString(partition.GetPartitionToken()))
},
slices.Values(partitions)))

ts, err := batchROTx.Timestamp()
if err != nil {
return nil, err
}

return &Result{
ColumnNames: sliceOf("Partition_Token"),
Rows: rows,
AffectedRows: len(rows),
Timestamp: ts,
ForceWrap: true,
}, nil
}

type TryPartitionedQueryStatement struct{ SQL string }

func (s *Session) RunPartitionQuery(ctx context.Context, stmt spanner.Statement) ([]*spanner.Partition, *spanner.BatchReadOnlyTransaction, error) {
tb := lo.FromPtrOr(s.systemVariables.ReadOnlyStaleness, spanner.StrongRead())

batchROTx, err := s.client.BatchReadOnlyTransaction(ctx, tb)
if err != nil {
return nil, nil, err
}

partitions, err := batchROTx.PartitionQueryWithOptions(ctx, stmt, spanner.PartitionOptions{}, spanner.QueryOptions{})
if err != nil {
batchROTx.Cleanup(ctx)
batchROTx.Close()
return nil, nil, fmt.Errorf("query can't be a partition query: %w", err)
}
return partitions, batchROTx, nil
}

func (s *TryPartitionedQueryStatement) Execute(ctx context.Context, session *Session) (*Result, error) {
stmt, err := newStatement(s.SQL, session.systemVariables.Params, false)
if err != nil {
return nil, err
}

_, batchROTx, err := session.RunPartitionQuery(ctx, stmt)
if err != nil {
return nil, err
}

defer func() {
batchROTx.Cleanup(ctx)
batchROTx.Close()
}()

ts, err := batchROTx.Timestamp()
if err != nil {
return nil, err
}

return &Result{
ColumnNames: sliceOf("Root_Partitionable"),
Rows: sliceOf(toRow("TRUE")),
AffectedRows: 1,
Timestamp: ts,
ForceWrap: true,
}, nil
}

type RunPartitionedQueryStatement struct{ SQL string }

func (s *RunPartitionedQueryStatement) Execute(ctx context.Context, session *Session) (*Result, error) {
return nil, errors.New("unsupported statement")
}

type RunPartitionStatement struct{ Token string }

func (s *RunPartitionStatement) Execute(ctx context.Context, session *Session) (*Result, error) {
return nil, errors.New("unsupported statement")
}

type NopStatement struct{}

func (s *NopStatement) Execute(ctx context.Context, session *Session) (*Result, error) {
Expand Down
20 changes: 20 additions & 0 deletions statement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,26 @@ func TestBuildStatement(t *testing.T) {
input: `EXPLAIN ANALYZE CALL cancel_query("1234567890123456789")`,
want: &ExplainAnalyzeStatement{Query: `CALL cancel_query("1234567890123456789")`},
},
{
desc: "PARTITION statement",
input: `PARTITION SELECT * FROM Singers`,
want: &PartitionStatement{SQL: `SELECT * FROM Singers`},
},
{
desc: "TRY PARTITIONED QUERY statement",
input: `TRY PARTITIONED QUERY SELECT * FROM Singers`,
want: &TryPartitionedQueryStatement{SQL: `SELECT * FROM Singers`},
},
{
desc: "RUN PARTITIONED QUERY statement",
input: `RUN PARTITIONED QUERY SELECT * FROM Singers`,
want: &RunPartitionedQueryStatement{SQL: `SELECT * FROM Singers`},
},
{
desc: "RUN PARTITION statement",
input: `RUN PARTITION '123456789'`,
want: &RunPartitionStatement{Token: `123456789`},
},
{
desc: "SET statement",
input: `SET OPTIMIZER_VERSION = "3"`,
Expand Down

0 comments on commit ea7fdbf

Please sign in to comment.