diff --git a/cmd/internal/planetscale_edge_database.go b/cmd/internal/planetscale_edge_database.go index 121940d..33d90b9 100644 --- a/cmd/internal/planetscale_edge_database.go +++ b/cmd/internal/planetscale_edge_database.go @@ -273,8 +273,16 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc * tc.Position = "" } + isFullSync := syncMode == "full" vtgateReq := buildVStreamRequest(tabletType, s.Name, tc.Shard, tc.Keyspace, tc.Position, tc.LastKnownPk) p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sRequesting VStream with %+v", preamble, vtgateReq)) + + if isFullSync { + p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sWill stop once COPY COMPLETED event is seen.", preamble)) + } else { + p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("%sWill stop once stop position [%+v] is found.", preamble, stopPosition)) + } + c, err := vtgateClient.VStream(ctx, vtgateReq) if err != nil { @@ -287,7 +295,6 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, syncMode string, tc * keyspaceOrDatabase = ps.Database } - isFullSync := syncMode == "full" copyCompletedSeen := false // Can finish sync once we've synced to the stop position, or finished the VStream COPY phase canFinishSync := false diff --git a/cmd/internal/planetscale_edge_database_test.go b/cmd/internal/planetscale_edge_database_test.go index f3a2584..41b947e 100644 --- a/cmd/internal/planetscale_edge_database_test.go +++ b/cmd/internal/planetscale_edge_database_test.go @@ -2598,7 +2598,7 @@ func TestRead_FullSync_MaxRetries(t *testing.T) { assert.Equal(t, 4, vsc.vstreamFnInvokedCount) logLines := tal.logMessages[LOGLEVEL_INFO] - assert.Equal(t, strings.TrimSpace(fmt.Sprintf("[connect-test:primary:customers shard : -] %v records synced after 3 syncs. Got error [DeadlineExceeded], returning with cursor [shard:\"-\" keyspace:\"connect-test\" position:\"MySQL56/e4e20f06-e28f-11ec-8d20-8e7ac09cb64c:1-10\" last_known_pk:{fields:{name:\"id\" type:INT64 charset:63 flags:53251} rows:{lengths:4 values:\"30\"}}] after gRPC error", 30)), strings.TrimSpace(logLines[len(logLines)-1])) + assert.Equal(t, strings.ReplaceAll(fmt.Sprintf("[connect-test:primary:customers shard : -] %v records synced after 3 syncs. Got error [DeadlineExceeded], returning with cursor [shard:\"-\" keyspace:\"connect-test\" position:\"MySQL56/e4e20f06-e28f-11ec-8d20-8e7ac09cb64c:1-10\" last_known_pk:{fields:{name:\"id\" type:INT64 charset:63 flags:53251} rows:{lengths:4 values:\"30\"}}] after gRPC error", 30), " ", ""), strings.ReplaceAll(logLines[len(logLines)-1], " ", "")) records := tal.records["connect-test.customers"] assert.Equal(t, 30, len(records)) } diff --git a/cmd/internal/types.go b/cmd/internal/types.go index 73d6a34..e31f28b 100644 --- a/cmd/internal/types.go +++ b/cmd/internal/types.go @@ -5,11 +5,13 @@ import ( "regexp" "strconv" "strings" + "time" "github.com/pkg/errors" psdbconnect "github.com/planetscale/airbyte-source/proto/psdbconnect/v1alpha1" "github.com/planetscale/psdb/core/codec" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/proto/query" ) const ( @@ -142,7 +144,7 @@ func QueryResultToRecords(qr *sqltypes.Result) []map[string]interface{} { record := make(map[string]interface{}) for idx, val := range row { if idx < len(columns) { - record[columns[idx]] = parseValue(val, qr.Fields[idx].GetColumnType()) + record[columns[idx]] = parseValue(val, qr.Fields[idx].GetColumnType(), qr.Fields[idx].GetType()) } } data = append(data, record) @@ -153,11 +155,14 @@ func QueryResultToRecords(qr *sqltypes.Result) []map[string]interface{} { // After the initial COPY phase, enum and set values may appear as an index instead of a value. // For example, a value might look like a "1" instead of "apple" in an enum('apple','banana','orange') column) -func parseValue(val sqltypes.Value, columnType string) sqltypes.Value { - if strings.HasPrefix(columnType, "enum") { +func parseValue(val sqltypes.Value, columnType string, queryColumnType query.Type) sqltypes.Value { + switch queryColumnType { + case query.Type_DATETIME, query.Type_DATE, query.Type_TIME: + return formatISO8601(queryColumnType, val) + case query.Type_ENUM: values := parseEnumOrSetValues(columnType) return mapEnumValue(val, values) - } else if strings.HasPrefix(columnType, "set") { + case query.Type_SET: values := parseEnumOrSetValues(columnType) return mapSetValue(val, values) } @@ -181,6 +186,28 @@ func parseEnumOrSetValues(columnType string) []string { return values } +func formatISO8601(mysqlType query.Type, value sqltypes.Value) sqltypes.Value { + parsedDatetime := value.ToString() + + var formatString string + var layout string + if mysqlType == query.Type_DATE { + formatString = "2006-01-02" + layout = time.DateOnly + } else { + formatString = "2006-01-02 15:04:05" + layout = time.RFC3339 + } + mysqlTime, err := time.Parse(formatString, parsedDatetime) + if err != nil { + // fallback to default value if datetime is not parseable + return value + } + iso8601Datetime := mysqlTime.Format(layout) + formattedValue, _ := sqltypes.NewValue(value.Type(), []byte(iso8601Datetime)) + return formattedValue +} + func mapSetValue(value sqltypes.Value, values []string) sqltypes.Value { parsedValue := value.ToString() parsedInt, err := strconv.ParseInt(parsedValue, 10, 64) diff --git a/cmd/internal/types_test.go b/cmd/internal/types_test.go index a477027..eb6b0d5 100644 --- a/cmd/internal/types_test.go +++ b/cmd/internal/types_test.go @@ -108,3 +108,29 @@ func TestCanMapEnumAndSetValues(t *testing.T) { assert.Equal(t, "active", secondRow["status"].(sqltypes.Value).ToString()) assert.Equal(t, "San Francisco,Oakland", secondRow["locations"].(sqltypes.Value).ToString()) } + +func TestCanFormatISO8601Values(t *testing.T) { + datetimeValue, err := sqltypes.NewValue(query.Type_DATETIME, []byte("2025-02-14 08:08:08")) + assert.NoError(t, err) + dateValue, err := sqltypes.NewValue(query.Type_DATE, []byte("2025-02-14")) + assert.NoError(t, err) + timestampValue, err := sqltypes.NewValue(query.Type_TIMESTAMP, []byte("2025-02-14 08:08:08")) + assert.NoError(t, err) + input := sqltypes.Result{ + Fields: []*query.Field{ + {Name: "datetime_created_at", Type: sqltypes.Datetime, ColumnType: "datetime"}, + {Name: "date_created_at", Type: sqltypes.Date, ColumnType: "date"}, + {Name: "timestamp_created_at", Type: sqltypes.Time, ColumnType: "timestamp"}, + }, + Rows: [][]sqltypes.Value{ + {datetimeValue, dateValue, timestampValue}, + }, + } + + output := QueryResultToRecords(&input) + assert.Equal(t, 1, len(output)) + row := output[0] + assert.Equal(t, "2025-02-14T08:08:08Z", row["datetime_created_at"].(sqltypes.Value).ToString()) + assert.Equal(t, "2025-02-14", row["date_created_at"].(sqltypes.Value).ToString()) + assert.Equal(t, "2025-02-14T08:08:08Z", row["timestamp_created_at"].(sqltypes.Value).ToString()) +}