diff --git a/examples/local/vstream_client.go b/examples/local/vstream_client.go index ab00f83871d..44041744c74 100644 --- a/examples/local/vstream_client.go +++ b/examples/local/vstream_client.go @@ -38,7 +38,7 @@ import ( */ func main() { ctx := context.Background() - streamCustomer := true + streamCustomer := false var vgtid *binlogdatapb.VGtid if streamCustomer { vgtid = &binlogdatapb.VGtid{ diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index beb556f21f5..064fa060407 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -65,6 +65,11 @@ const tabletPickerContextTimeout = 90 * time.Second // ending the stream from the tablet. const stopOnReshardDelay = 500 * time.Millisecond +type globalTableName struct { + Keyspace string + Table string +} + // vstream contains the metadata for one VStream request. type vstream struct { // mu protects parts of vgtid, the semantics of a send, and journaler. @@ -123,6 +128,9 @@ type vstream struct { // the shard map tracking the copy completion, keyed by streamId. streamId is of the form . copyCompletedShard map[string]struct{} + // A map of initial table schemas (CREATE TABLE) sent to the client if we're copying tables. + copySchemaSent map[globalTableName]struct{} + vsm *vstreamManager eventCh chan []*binlogdatapb.VEvent @@ -595,6 +603,39 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } } + if sgtid.Gtid == "" { + // We're copying the tables, so let's first send the table schemas in the + // stream so that the client can create the tables if they like and follow + // the full lifecycle of the tables. + if vs.copySchemaSent == nil { + vs.copySchemaSent = make(map[globalTableName]struct{}, 5) + } + ddlevents := make([]*binlogdatapb.VEvent, 0, 5) + if err := tabletConn.GetSchema(ctx, target, querypb.SchemaTableType_TABLES, nil, func(res *querypb.GetSchemaResponse) error { + for tableName, schema := range res.TableDefinition { + key := globalTableName{Keyspace: sgtid.Keyspace, Table: tableName} + func() { + vs.mu.Lock() + defer vs.mu.Unlock() + if _, ok := vs.copySchemaSent[key]; !ok { + ev := &binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_DDL, + Statement: schema, + } + ddlevents = append(ddlevents, ev) + vs.copySchemaSent[key] = struct{}{} + } + }() + } + return nil + }); err != nil { + return vterrors.Wrapf(err, "failed to get schema for keyspace %s", sgtid.Keyspace) + } + if err := vs.send(ddlevents); err != nil { + return vterrors.Wrapf(err, "failed to send schema DDL events for keyspace %s", sgtid.Keyspace) + } + } + // Safe to access sgtid.Gtid here (because it can't change until streaming begins). req := &binlogdatapb.VStreamRequest{ Target: target,