From 7deb2ef2c8edb687a5664f613eb3a686768e973a Mon Sep 17 00:00:00 2001 From: Frances Thai Date: Fri, 14 Feb 2025 11:31:05 -0800 Subject: [PATCH 1/6] Add log indicating whether we are looking for a stop position or a COPY COMPLETED --- cmd/internal/planetscale_edge_database.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/cmd/internal/planetscale_edge_database.go b/cmd/internal/planetscale_edge_database.go index 121940d..33d90b9 100644 --- a/cmd/internal/planetscale_edge_database.go +++ b/cmd/internal/planetscale_edge_database.go @@ -273,8 +273,16 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc * tc.Position = "" } + isFullSync := syncMode == "full" vtgateReq := buildVStreamRequest(tabletType, s.Name, tc.Shard, tc.Keyspace, tc.Position, tc.LastKnownPk) p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sRequesting VStream with %+v", preamble, vtgateReq)) + + if isFullSync { + p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sWill stop once COPY COMPLETED event is seen.", preamble)) + } else { + p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sWill stop once stop position [%+v] is found.", preamble, stopPosition)) + } + c, err := vtgateClient.VStream(ctx, vtgateReq) if err != nil { @@ -287,7 +295,6 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc * keyspaceOrDatabase = ps.Database } - isFullSync := syncMode == "full" copyCompletedSeen := false // Can finish sync once we've synced to the stop position, or finished the VStream COPY phase canFinishSync := false From 10c7b713f39fa80f027c2768eef1190b0ea980b3 Mon Sep 17 00:00:00 2001 From: Frances Thai Date: Fri, 14 Feb 2025 15:20:47 -0800 Subject: [PATCH 2/6] Add test --- cmd/internal/types.go | 22 ++++++++++++++++++++++ cmd/internal/types_test.go | 26 ++++++++++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/cmd/internal/types.go b/cmd/internal/types.go index 73d6a34..1396773 100644 --- a/cmd/internal/types.go +++ b/cmd/internal/types.go @@ -5,6 +5,7 @@ import ( "regexp" "strconv" "strings" + "time" "github.com/pkg/errors" psdbconnect "github.com/planetscale/airbyte-source/proto/psdbconnect/v1alpha1" @@ -160,6 +161,8 @@ func parseValue(val sqltypes.Value, columnType string) sqltypes.Value { } else if strings.HasPrefix(columnType, "set") { values := parseEnumOrSetValues(columnType) return mapSetValue(val, values) + } else if lowerCased := strings.ToLower(columnType); strings.HasPrefix(lowerCased, "time") || strings.HasPrefix(lowerCased, "date") { + return formatISO8601(lowerCased, val) } return val @@ -181,6 +184,25 @@ func parseEnumOrSetValues(columnType string) []string { return values } +func formatISO8601(mysqlType string, value sqltypes.Value) sqltypes.Value { + parsedDatetime := value.ToString() + + var formatString string + if mysqlType == "date" { + formatString = "2006-01-02" + } else { + formatString = "2006-01-02 15:04:05" + } + mysqlTime, err := time.Parse(formatString, parsedDatetime) + if err != nil { + // fallback to default value if datetime is not parseable + return value + } + iso8601Datetime := mysqlTime.Format(time.RFC3339) + formattedValue, _ := sqltypes.NewValue(value.Type(), []byte(iso8601Datetime)) + return formattedValue +} + func mapSetValue(value sqltypes.Value, values []string) sqltypes.Value { parsedValue := value.ToString() parsedInt, err := strconv.ParseInt(parsedValue, 10, 64) diff --git a/cmd/internal/types_test.go b/cmd/internal/types_test.go index a477027..4c8a832 100644 --- a/cmd/internal/types_test.go +++ b/cmd/internal/types_test.go @@ -108,3 +108,29 @@ func TestCanMapEnumAndSetValues(t *testing.T) { assert.Equal(t, "active", secondRow["status"].(sqltypes.Value).ToString()) assert.Equal(t, "San Francisco,Oakland", secondRow["locations"].(sqltypes.Value).ToString()) } + +func TestCanFormatISO8601Values(t *testing.T) { + datetimeValue, err := sqltypes.NewValue(query.Type_DATETIME, []byte("2025-02-14 08:08:08")) + assert.NoError(t, err) + dateValue, err := sqltypes.NewValue(query.Type_DATE, []byte("2025-02-14")) + assert.NoError(t, err) + timestampValue, err := sqltypes.NewValue(query.Type_TIMESTAMP, []byte("2025-02-14 08:08:08")) + assert.NoError(t, err) + input := sqltypes.Result{ + Fields: []*query.Field{ + {Name: "datetime_created_at", Type: sqltypes.Datetime, ColumnType: "datetime"}, + {Name: "date_created_at", Type: sqltypes.Date, ColumnType: "date"}, + {Name: "timestamp_created_at", Type: sqltypes.Set, ColumnType: "timestamp"}, + }, + Rows: [][]sqltypes.Value{ + {datetimeValue, dateValue, timestampValue}, + }, + } + + output := QueryResultToRecords(&input) + assert.Equal(t, 1, len(output)) + row := output[0] + assert.Equal(t, "2025-02-14T08:08:08Z", row["datetime_created_at"].(sqltypes.Value).ToString()) + assert.Equal(t, "2025-02-14T00:00:00Z", row["date_created_at"].(sqltypes.Value).ToString()) + assert.Equal(t, "2025-02-14T08:08:08Z", row["timestamp_created_at"].(sqltypes.Value).ToString()) +} From 25d1b3b7b3919f8b875346dc938353fb22c3f4b0 Mon Sep 17 00:00:00 2001 From: Frances Thai Date: Tue, 18 Feb 2025 08:33:21 -0800 Subject: [PATCH 3/6] strip all whitespaces before comparing in test --- cmd/internal/planetscale_edge_database_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/internal/planetscale_edge_database_test.go b/cmd/internal/planetscale_edge_database_test.go index f3a2584..41b947e 100644 --- a/cmd/internal/planetscale_edge_database_test.go +++ b/cmd/internal/planetscale_edge_database_test.go @@ -2598,7 +2598,7 @@ func TestRead_FullSync_MaxRetries(t *testing.T) { assert.Equal(t, 4, vsc.vstreamFnInvokedCount) logLines := tal.logMessages[LOGLEVEL_INFO] - assert.Equal(t, strings.TrimSpace(fmt.Sprintf("[connect-test:primary:customers shard : -] %v records synced after 3 syncs. Got error [DeadlineExceeded], returning with cursor [shard:\"-\" keyspace:\"connect-test\" position:\"MySQL56/e4e20f06-e28f-11ec-8d20-8e7ac09cb64c:1-10\" last_known_pk:{fields:{name:\"id\" type:INT64 charset:63 flags:53251} rows:{lengths:4 values:\"30\"}}] after gRPC error", 30)), strings.TrimSpace(logLines[len(logLines)-1])) + assert.Equal(t, strings.ReplaceAll(fmt.Sprintf("[connect-test:primary:customers shard : -] %v records synced after 3 syncs. Got error [DeadlineExceeded], returning with cursor [shard:\"-\" keyspace:\"connect-test\" position:\"MySQL56/e4e20f06-e28f-11ec-8d20-8e7ac09cb64c:1-10\" last_known_pk:{fields:{name:\"id\" type:INT64 charset:63 flags:53251} rows:{lengths:4 values:\"30\"}}] after gRPC error", 30), " ", ""), strings.ReplaceAll(logLines[len(logLines)-1], " ", "")) records := tal.records["connect-test.customers"] assert.Equal(t, 30, len(records)) } From 368bdc2941ff3463146b4ebfc63986b1874b21ce Mon Sep 17 00:00:00 2001 From: Frances Thai Date: Tue, 18 Feb 2025 08:41:14 -0800 Subject: [PATCH 4/6] Use date without time for date --- cmd/internal/types.go | 5 ++++- cmd/internal/types_test.go | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/cmd/internal/types.go b/cmd/internal/types.go index 1396773..d1557c0 100644 --- a/cmd/internal/types.go +++ b/cmd/internal/types.go @@ -188,17 +188,20 @@ func formatISO8601(mysqlType string, value sqltypes.Value) sqltypes.Value { parsedDatetime := value.ToString() var formatString string + var layout string if mysqlType == "date" { formatString = "2006-01-02" + layout = time.DateOnly } else { formatString = "2006-01-02 15:04:05" + layout = time.RFC3339 } mysqlTime, err := time.Parse(formatString, parsedDatetime) if err != nil { // fallback to default value if datetime is not parseable return value } - iso8601Datetime := mysqlTime.Format(time.RFC3339) + iso8601Datetime := mysqlTime.Format(layout) formattedValue, _ := sqltypes.NewValue(value.Type(), []byte(iso8601Datetime)) return formattedValue } diff --git a/cmd/internal/types_test.go b/cmd/internal/types_test.go index 4c8a832..77cc262 100644 --- a/cmd/internal/types_test.go +++ b/cmd/internal/types_test.go @@ -131,6 +131,6 @@ func TestCanFormatISO8601Values(t *testing.T) { assert.Equal(t, 1, len(output)) row := output[0] assert.Equal(t, "2025-02-14T08:08:08Z", row["datetime_created_at"].(sqltypes.Value).ToString()) - assert.Equal(t, "2025-02-14T00:00:00Z", row["date_created_at"].(sqltypes.Value).ToString()) + assert.Equal(t, "2025-02-14", row["date_created_at"].(sqltypes.Value).ToString()) assert.Equal(t, "2025-02-14T08:08:08Z", row["timestamp_created_at"].(sqltypes.Value).ToString()) } From ccee5e1eef1cb01bf1df53116d35e6cf42205665 Mon Sep 17 00:00:00 2001 From: Frances Thai Date: Tue, 18 Feb 2025 09:26:45 -0800 Subject: [PATCH 5/6] Use query type --- cmd/internal/types.go | 17 +++++++++-------- cmd/internal/types_test.go | 2 +- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/cmd/internal/types.go b/cmd/internal/types.go index d1557c0..7b012f5 100644 --- a/cmd/internal/types.go +++ b/cmd/internal/types.go @@ -11,6 +11,7 @@ import ( psdbconnect "github.com/planetscale/airbyte-source/proto/psdbconnect/v1alpha1" "github.com/planetscale/psdb/core/codec" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/proto/query" ) const ( @@ -143,7 +144,7 @@ func QueryResultToRecords(qr *sqltypes.Result) []map[string]interface{} { record := make(map[string]interface{}) for idx, val := range row { if idx < len(columns) { - record[columns[idx]] = parseValue(val, qr.Fields[idx].GetColumnType()) + record[columns[idx]] = parseValue(val, qr.Fields[idx].GetColumnType(), qr.Fields[idx].GetType()) } } data = append(data, record) @@ -154,15 +155,15 @@ func QueryResultToRecords(qr *sqltypes.Result) []map[string]interface{} { // After the initial COPY phase, enum and set values may appear as an index instead of a value. // For example, a value might look like a "1" instead of "apple" in an enum('apple','banana','orange') column) -func parseValue(val sqltypes.Value, columnType string) sqltypes.Value { - if strings.HasPrefix(columnType, "enum") { +func parseValue(val sqltypes.Value, columnType string, queryColumnType query.Type) sqltypes.Value { + if queryColumnType == query.Type_DATETIME || queryColumnType == query.Type_DATE || queryColumnType == query.Type_TIME { + return formatISO8601(queryColumnType, val) + } else if queryColumnType == query.Type_ENUM { values := parseEnumOrSetValues(columnType) return mapEnumValue(val, values) - } else if strings.HasPrefix(columnType, "set") { + } else if queryColumnType == query.Type_SET { values := parseEnumOrSetValues(columnType) return mapSetValue(val, values) - } else if lowerCased := strings.ToLower(columnType); strings.HasPrefix(lowerCased, "time") || strings.HasPrefix(lowerCased, "date") { - return formatISO8601(lowerCased, val) } return val @@ -184,12 +185,12 @@ func parseEnumOrSetValues(columnType string) []string { return values } -func formatISO8601(mysqlType string, value sqltypes.Value) sqltypes.Value { +func formatISO8601(mysqlType query.Type, value sqltypes.Value) sqltypes.Value { parsedDatetime := value.ToString() var formatString string var layout string - if mysqlType == "date" { + if mysqlType == query.Type_DATE { formatString = "2006-01-02" layout = time.DateOnly } else { diff --git a/cmd/internal/types_test.go b/cmd/internal/types_test.go index 77cc262..eb6b0d5 100644 --- a/cmd/internal/types_test.go +++ b/cmd/internal/types_test.go @@ -120,7 +120,7 @@ func TestCanFormatISO8601Values(t *testing.T) { Fields: []*query.Field{ {Name: "datetime_created_at", Type: sqltypes.Datetime, ColumnType: "datetime"}, {Name: "date_created_at", Type: sqltypes.Date, ColumnType: "date"}, - {Name: "timestamp_created_at", Type: sqltypes.Set, ColumnType: "timestamp"}, + {Name: "timestamp_created_at", Type: sqltypes.Time, ColumnType: "timestamp"}, }, Rows: [][]sqltypes.Value{ {datetimeValue, dateValue, timestampValue}, From 3612a670589eec55ce36964a0f5cec3476cc1e51 Mon Sep 17 00:00:00 2001 From: Frances Thai Date: Tue, 18 Feb 2025 11:09:20 -0800 Subject: [PATCH 6/6] Use SWITCH statement instead --- cmd/internal/types.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cmd/internal/types.go b/cmd/internal/types.go index 7b012f5..e31f28b 100644 --- a/cmd/internal/types.go +++ b/cmd/internal/types.go @@ -156,12 +156,13 @@ func QueryResultToRecords(qr *sqltypes.Result) []map[string]interface{} { // After the initial COPY phase, enum and set values may appear as an index instead of a value. // For example, a value might look like a "1" instead of "apple" in an enum('apple','banana','orange') column) func parseValue(val sqltypes.Value, columnType string, queryColumnType query.Type) sqltypes.Value { - if queryColumnType == query.Type_DATETIME || queryColumnType == query.Type_DATE || queryColumnType == query.Type_TIME { + switch queryColumnType { + case query.Type_DATETIME, query.Type_DATE, query.Type_TIME: return formatISO8601(queryColumnType, val) - } else if queryColumnType == query.Type_ENUM { + case query.Type_ENUM: values := parseEnumOrSetValues(columnType) return mapEnumValue(val, values) - } else if queryColumnType == query.Type_SET { + case query.Type_SET: values := parseEnumOrSetValues(columnType) return mapSetValue(val, values) }