Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert MySQL datetime string to ISO8601 #122

Merged
merged 7 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion cmd/internal/planetscale_edge_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,16 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc *
tc.Position = ""
}

isFullSync := syncMode == "full"
vtgateReq := buildVStreamRequest(tabletType, s.Name, tc.Shard, tc.Keyspace, tc.Position, tc.LastKnownPk)
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sRequesting VStream with %+v", preamble, vtgateReq))

if isFullSync {
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sWill stop once COPY COMPLETED event is seen.", preamble))
} else {
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sWill stop once stop position [%+v] is found.", preamble, stopPosition))
}

c, err := vtgateClient.VStream(ctx, vtgateReq)

if err != nil {
Expand All @@ -287,7 +295,6 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc *
keyspaceOrDatabase = ps.Database
}

isFullSync := syncMode == "full"
copyCompletedSeen := false
// Can finish sync once we've synced to the stop position, or finished the VStream COPY phase
canFinishSync := false
Expand Down
2 changes: 1 addition & 1 deletion cmd/internal/planetscale_edge_database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2598,7 +2598,7 @@ func TestRead_FullSync_MaxRetries(t *testing.T) {
assert.Equal(t, 4, vsc.vstreamFnInvokedCount)

logLines := tal.logMessages[LOGLEVEL_INFO]
assert.Equal(t, strings.TrimSpace(fmt.Sprintf("[connect-test:primary:customers shard : -] %v records synced after 3 syncs. Got error [DeadlineExceeded], returning with cursor [shard:\"-\" keyspace:\"connect-test\" position:\"MySQL56/e4e20f06-e28f-11ec-8d20-8e7ac09cb64c:1-10\" last_known_pk:{fields:{name:\"id\" type:INT64 charset:63 flags:53251} rows:{lengths:4 values:\"30\"}}] after gRPC error", 30)), strings.TrimSpace(logLines[len(logLines)-1]))
assert.Equal(t, strings.ReplaceAll(fmt.Sprintf("[connect-test:primary:customers shard : -] %v records synced after 3 syncs. Got error [DeadlineExceeded], returning with cursor [shard:\"-\" keyspace:\"connect-test\" position:\"MySQL56/e4e20f06-e28f-11ec-8d20-8e7ac09cb64c:1-10\" last_known_pk:{fields:{name:\"id\" type:INT64 charset:63 flags:53251} rows:{lengths:4 values:\"30\"}}] after gRPC error", 30), " ", ""), strings.ReplaceAll(logLines[len(logLines)-1], " ", ""))
records := tal.records["connect-test.customers"]
assert.Equal(t, 30, len(records))
}
35 changes: 31 additions & 4 deletions cmd/internal/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"regexp"
"strconv"
"strings"
"time"

"github.com/pkg/errors"
psdbconnect "github.com/planetscale/airbyte-source/proto/psdbconnect/v1alpha1"
"github.com/planetscale/psdb/core/codec"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/proto/query"
)

const (
Expand Down Expand Up @@ -142,7 +144,7 @@ func QueryResultToRecords(qr *sqltypes.Result) []map[string]interface{} {
record := make(map[string]interface{})
for idx, val := range row {
if idx < len(columns) {
record[columns[idx]] = parseValue(val, qr.Fields[idx].GetColumnType())
record[columns[idx]] = parseValue(val, qr.Fields[idx].GetColumnType(), qr.Fields[idx].GetType())
}
}
data = append(data, record)
Expand All @@ -153,11 +155,14 @@ func QueryResultToRecords(qr *sqltypes.Result) []map[string]interface{} {

// After the initial COPY phase, enum and set values may appear as an index instead of a value.
// For example, a value might look like a "1" instead of "apple" in an enum('apple','banana','orange') column)
func parseValue(val sqltypes.Value, columnType string) sqltypes.Value {
if strings.HasPrefix(columnType, "enum") {
func parseValue(val sqltypes.Value, columnType string, queryColumnType query.Type) sqltypes.Value {
switch queryColumnType {
case query.Type_DATETIME, query.Type_DATE, query.Type_TIME:
return formatISO8601(queryColumnType, val)
case query.Type_ENUM:
values := parseEnumOrSetValues(columnType)
return mapEnumValue(val, values)
} else if strings.HasPrefix(columnType, "set") {
case query.Type_SET:
values := parseEnumOrSetValues(columnType)
return mapSetValue(val, values)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the enum and set code above is wrong after vitessio/vitess#15723

That work is in v20+ in OSS and private.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'll remove it once everyone is on 20+ 😄 By "wrong" do you mean "unnecessary" or do you mean it will actually return the wrong values?

Copy link

@mattlord mattlord Feb 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be wrong if the user has string values in the ENUM/SET that can be converted to integers. For example, the string value may be 2 but it's not the index value of 2.

And we can do this work only when necessary by doing it conditional on:

  1. We're NOT in the copy phase
  2. The EnumSetStringValues != TRUE

See: https://github.com/vitessio/vitess/blob/main/changelog/20.0/20.0.0/summary.md#enum-set-vstream

That is exactly what they did in the Debezium connector.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 We will address this in the next PR!

}
Expand All @@ -181,6 +186,28 @@ func parseEnumOrSetValues(columnType string) []string {
return values
}

func formatISO8601(mysqlType query.Type, value sqltypes.Value) sqltypes.Value {
parsedDatetime := value.ToString()

var formatString string
var layout string
if mysqlType == query.Type_DATE {
formatString = "2006-01-02"
layout = time.DateOnly
} else {
formatString = "2006-01-02 15:04:05"
layout = time.RFC3339
}
mysqlTime, err := time.Parse(formatString, parsedDatetime)
if err != nil {
// fallback to default value if datetime is not parseable
return value
}
iso8601Datetime := mysqlTime.Format(layout)
formattedValue, _ := sqltypes.NewValue(value.Type(), []byte(iso8601Datetime))
return formattedValue
}

func mapSetValue(value sqltypes.Value, values []string) sqltypes.Value {
parsedValue := value.ToString()
parsedInt, err := strconv.ParseInt(parsedValue, 10, 64)
Expand Down
26 changes: 26 additions & 0 deletions cmd/internal/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,29 @@ func TestCanMapEnumAndSetValues(t *testing.T) {
assert.Equal(t, "active", secondRow["status"].(sqltypes.Value).ToString())
assert.Equal(t, "San Francisco,Oakland", secondRow["locations"].(sqltypes.Value).ToString())
}

func TestCanFormatISO8601Values(t *testing.T) {
datetimeValue, err := sqltypes.NewValue(query.Type_DATETIME, []byte("2025-02-14 08:08:08"))
assert.NoError(t, err)
dateValue, err := sqltypes.NewValue(query.Type_DATE, []byte("2025-02-14"))
assert.NoError(t, err)
timestampValue, err := sqltypes.NewValue(query.Type_TIMESTAMP, []byte("2025-02-14 08:08:08"))
assert.NoError(t, err)
input := sqltypes.Result{
Fields: []*query.Field{
{Name: "datetime_created_at", Type: sqltypes.Datetime, ColumnType: "datetime"},
{Name: "date_created_at", Type: sqltypes.Date, ColumnType: "date"},
{Name: "timestamp_created_at", Type: sqltypes.Time, ColumnType: "timestamp"},
},
Rows: [][]sqltypes.Value{
{datetimeValue, dateValue, timestampValue},
},
}

output := QueryResultToRecords(&input)
assert.Equal(t, 1, len(output))
row := output[0]
assert.Equal(t, "2025-02-14T08:08:08Z", row["datetime_created_at"].(sqltypes.Value).ToString())
assert.Equal(t, "2025-02-14", row["date_created_at"].(sqltypes.Value).ToString())
assert.Equal(t, "2025-02-14T08:08:08Z", row["timestamp_created_at"].(sqltypes.Value).ToString())
}
Loading