forked from GoogleCloudPlatform/spanner-migration-tool
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlauncher.go
486 lines (454 loc) · 18.6 KB
/
launcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"net/url"
"strings"
"sync"
"time"
"cloud.google.com/go/spanner"
dataflow "cloud.google.com/go/dataflow/apiv1beta3"
"cloud.google.com/go/dataflow/apiv1beta3/dataflowpb"
"cloud.google.com/go/pubsub"
database "cloud.google.com/go/spanner/admin/database/apiv1"
"cloud.google.com/go/storage"
"google.golang.org/api/iterator"
adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1"
)
/*
TODO: Add modes for running such as:
Launch only the Dataflow jobs and skip the ChangeStream and PubSub
Launch only the Ordering Dataflow job
Launch only the Writer Dataflow job etc.
*/
var (
projectId string
dataflowRegion string
jobNamePrefix string
changeStreamName string
instanceId string
dbName string
metadataInstance string
metadataDatabase string
startTimestamp string
pubSubDataTopicId string
pubSubEndpoint string
sourceShardsFilePath string
sessionFilePath string
machineType string
vpcNetwork string
vpcSubnetwork string
vpcHostProjectId string
serviceAccountEmail string
orderingWorkers int
writerWorkers int
networkTags string
filtrationMode string
)
const (
ALREADY_EXISTS_ERROR = "code = AlreadyExists"
)
func setupGlobalFlags() {
flag.StringVar(&projectId, "projectId", "", "projectId")
flag.StringVar(&dataflowRegion, "dataflowRegion", "", "region for dataflow jobs")
flag.StringVar(&jobNamePrefix, "jobNamePrefix", "reverse-rep", "job name prefix for the dataflow jobs, defaults to reverse-rep. Automatically converted to lower case due to Dataflow name constraints.")
flag.StringVar(&changeStreamName, "changeStreamName", "reverseReplicationStream", "change stream name, defaults to reverseReplicationStream")
flag.StringVar(&instanceId, "instanceId", "", "spanner instance id")
flag.StringVar(&dbName, "dbName", "", "spanner database name")
flag.StringVar(&metadataInstance, "metadataInstance", "", "spanner instance name to store changestream metadata, defaults to target Spanner instance")
flag.StringVar(&metadataDatabase, "metadataDatabase", "change-stream-metadata", "spanner database name to store changestream metadata, defaults to change-stream-metadata")
flag.StringVar(&startTimestamp, "startTimestamp", "", "timestamp from which the changestream should start reading changes in RFC 3339 format, defaults to empty string which is equivalent to the current timestamp.")
flag.StringVar(&pubSubDataTopicId, "pubSubDataTopicId", "reverse-replication", "pub/sub data topic id. DO NOT INCLUDE the prefix 'projects/<project_name>/topics/'. Defaults to 'reverse-replication'")
flag.StringVar(&pubSubEndpoint, "pubSubEndpoint", "", "pub/sub endpoint, defaults to same endpoint as the dataflow region.")
flag.StringVar(&sourceShardsFilePath, "sourceShardsFilePath", "", "gcs file path for file containing shard info")
flag.StringVar(&sessionFilePath, "sessionFilePath", "", "gcs file path for session file generated via Spanner migration tool")
flag.StringVar(&machineType, "machineType", "n2-standard-4", "dataflow worker machine type, defaults to n2-standard-4")
flag.StringVar(&vpcNetwork, "vpcNetwork", "", "Name of the VPC network to be used for the dataflow jobs")
flag.StringVar(&vpcSubnetwork, "vpcSubnetwork", "", "Name of the VPC subnetwork to be used for the dataflow jobs. Subnet should exist in the same region as the 'dataflowRegion' parameter")
flag.StringVar(&vpcHostProjectId, "vpcHostProjectId", "", "Project ID hosting the subnetwork. If unspecified, the 'projectId' parameter value will be used for subnetwork.")
flag.StringVar(&serviceAccountEmail, "serviceAccountEmail", "", "The email address of the service account to run the job as")
flag.IntVar(&orderingWorkers, "orderingWorkers", 5, "number of workers for ordering job")
flag.IntVar(&writerWorkers, "writerWorkers", 5, "number of workers for writer job")
flag.StringVar(&networkTags, "networkTags", "", "Network tags addded to the Dataflow jobs worker and launcher VMs")
flag.StringVar(&filtrationMode, "filtrationMode", "forward_migration", "Whether to filter forward migrated data or not. Supported values are forward_migration and none, defaults to 'forward_migration'")
}
func prechecks() error {
if projectId == "" {
return fmt.Errorf("please specify a valid projectId")
}
if dataflowRegion == "" {
return fmt.Errorf("please specify a valid dataflowRegion")
}
if jobNamePrefix == "" {
return fmt.Errorf("please specify a non-empty jobNamePrefix")
} else {
// Capital letters not allowed in Dataflow job names.
jobNamePrefix = strings.ToLower(jobNamePrefix)
}
if changeStreamName == "" {
return fmt.Errorf("please specify a valid changeStreamName")
}
if instanceId == "" {
return fmt.Errorf("please specify a valid instanceId")
}
if dbName == "" {
return fmt.Errorf("please specify a valid dbName")
}
if metadataInstance == "" {
metadataInstance = instanceId
fmt.Println("metadataInstance not provided, defaulting to target spanner instance id: ", metadataInstance)
}
if metadataDatabase == "" {
metadataDatabase = "change-stream-metadata"
fmt.Println("metadataDatabase not provided, defaulting to: ", metadataDatabase)
}
if pubSubDataTopicId == "" {
return fmt.Errorf("please specify a valid pubSubDataTopicId")
} else if strings.Contains(pubSubDataTopicId, "/") {
return fmt.Errorf("please specify a valid pubSubDataTopicId. '/' is not a valid character for topic id. DO NOT INCLUDE the prefix 'projects/<project_name>/topics/' for this flag.")
}
if sourceShardsFilePath == "" {
return fmt.Errorf("please specify a valid sourceShardsFilePath")
}
if sessionFilePath == "" {
return fmt.Errorf("please specify a valid sessionFilePath")
}
if machineType == "" {
machineType = "n2-standard-4"
fmt.Println("machineType not provided, defaulting to: ", machineType)
}
if pubSubEndpoint == "" {
pubSubEndpoint = fmt.Sprintf("%s-pubsub.googleapis.com:443", dataflowRegion)
}
if vpcHostProjectId == "" {
vpcHostProjectId = projectId
}
return nil
}
func main() {
fmt.Println("Setting up reverse replication pipeline...")
ORDERING_TEMPLATE := "gs://dataflow-templates/2023-10-12-00_RC00/flex/Spanner_Change_Streams_to_Sink"
WRITER_TEMPLATE := "gs://dataflow-templates/2023-10-12-00_RC00/flex/Ordered_Changestream_Buffer_to_Sourcedb"
setupGlobalFlags()
flag.Parse()
err := prechecks()
if err != nil {
fmt.Println("incorrect arguments passed:", err)
return
}
dbUri := fmt.Sprintf("projects/%s/instances/%s/databases/%s", projectId, instanceId, dbName)
ctx := context.Background()
adminClient, _ := database.NewDatabaseAdminClient(ctx)
spClient, err := spanner.NewClient(ctx, dbUri)
err = validateOrCreateChangeStream(ctx, adminClient, spClient, dbUri)
if err != nil {
fmt.Println("Error in validating/creating changestream:", err)
return
}
createDbReq := &adminpb.CreateDatabaseRequest{
Parent: fmt.Sprintf("projects/%s/instances/%s", projectId, metadataInstance),
CreateStatement: fmt.Sprintf("CREATE DATABASE `%s`", metadataDatabase),
}
createDbOp, err := adminClient.CreateDatabase(ctx, createDbReq)
if err != nil {
if !strings.Contains(err.Error(), ALREADY_EXISTS_ERROR) {
fmt.Printf("Cannot submit create database request for metadata db: %v\n", err)
return
} else {
fmt.Printf("metadata db %s already exists...skipping creation\n", fmt.Sprintf("projects/%s/instances/%s/databases/%s", projectId, metadataInstance, metadataDatabase))
}
} else {
if _, err := createDbOp.Wait(ctx); err != nil {
if !strings.Contains(err.Error(), ALREADY_EXISTS_ERROR) {
fmt.Printf("create database request failed for metadata db: %v\n", err)
return
} else {
fmt.Printf("metadata db %s already exists...skipping creation\n", fmt.Sprintf("projects/%s/instances/%s/databases/%s", projectId, metadataInstance, metadataDatabase))
}
} else {
fmt.Println("Created metadata db", fmt.Sprintf("projects/%s/instances/%s/databases/%s", projectId, metadataInstance, metadataDatabase))
}
}
gcsclient, _ := storage.NewClient(ctx)
u, _ := url.Parse(sourceShardsFilePath)
rc, _ := gcsclient.Bucket(u.Host).Object(u.Path[1:]).NewReader(ctx)
bArr, _ := ioutil.ReadAll(rc)
rc.Close()
var data []interface{}
json.Unmarshal(bArr, &data)
arr := []string{}
for i := 0; i < len(data); i++ {
arr = append(arr, data[i].(map[string]interface{})["logicalShardId"].(string))
}
pubSubDataTopicUri := fmt.Sprintf("projects/%s/topics/%s", projectId, pubSubDataTopicId)
topicName := pubSubDataTopicId
client, err := pubsub.NewClient(ctx, projectId)
if err != nil {
fmt.Println(err)
}
defer client.Close()
_, err = client.CreateTopic(ctx, topicName)
if err != nil {
if !(strings.Contains(err.Error(), ALREADY_EXISTS_ERROR)) {
fmt.Printf("could not create topic: %v\n", err)
return
} else {
fmt.Printf("topic '%s' already exists, skipping creation...\n", topicName)
}
} else {
fmt.Println("Created topic ", pubSubDataTopicUri)
}
subError := false
wg := &sync.WaitGroup{}
for i := 0; i < len(arr); i++ {
wg.Add(1)
go func(shardId string) {
defer wg.Done()
_, err := client.CreateSubscription(ctx, shardId, pubsub.SubscriptionConfig{
Topic: client.Topic(topicName),
AckDeadline: 600 * time.Second,
EnableMessageOrdering: true,
Filter: fmt.Sprintf("attributes.shardId=\"%s\"", shardId),
})
if err != nil {
if !(strings.Contains(err.Error(), ALREADY_EXISTS_ERROR)) {
fmt.Printf("could not create subscription: %v\n", err)
subError = true
return
} else {
err := verifySubscription(ctx, client, shardId)
if err != nil {
fmt.Printf("subscription '%s' already exists, but is configured incorrectly: %v\n", shardId, err)
subError = true
return
}
fmt.Printf("subscription '%s' already exists, skipping creation\n", shardId)
}
return
}
fmt.Println("Created Pub/Sub subscription: ", shardId)
}(arr[i])
}
wg.Wait()
if subError {
fmt.Printf("error in creating/validating subscriptions\n")
return
}
c, err := dataflow.NewFlexTemplatesClient(ctx)
if err != nil {
fmt.Printf("could not create flex template client: %v\n", err)
return
}
defer c.Close()
// If custom network is not selected, use public IP. Typical for internal testing flow.
workerIpAddressConfig := dataflowpb.WorkerIPAddressConfiguration_WORKER_IP_PUBLIC
if vpcNetwork != "" || vpcSubnetwork != "" {
workerIpAddressConfig = dataflowpb.WorkerIPAddressConfiguration_WORKER_IP_PRIVATE
// If subnetwork is not provided, assume network has auto subnet configuration.
if vpcSubnetwork != "" {
vpcSubnetwork = fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/subnetworks/%s", vpcHostProjectId, dataflowRegion, vpcSubnetwork)
}
}
var additionalExpr []string
if networkTags == "" {
additionalExpr = []string{"use_runner_v2"}
} else {
additionalExpr = []string{"use_runner_v2", "use_network_tags=" + networkTags, "use_network_tags_for_flex_templates=" + networkTags}
}
launchParameters := &dataflowpb.LaunchFlexTemplateParameter{
JobName: fmt.Sprintf("%s-ordering", jobNamePrefix),
Template: &dataflowpb.LaunchFlexTemplateParameter_ContainerSpecGcsPath{ContainerSpecGcsPath: ORDERING_TEMPLATE},
Parameters: map[string]string{
"changeStreamName": changeStreamName,
"instanceId": instanceId,
"databaseId": dbName,
"spannerProjectId": projectId,
"metadataInstance": metadataInstance,
"metadataDatabase": metadataDatabase,
"startTimestamp": startTimestamp,
"incrementInterval": "10",
"sinkType": "pubsub",
"pubSubDataTopicId": pubSubDataTopicUri,
"pubSubErrorTopicId": pubSubDataTopicUri,
"pubSubEndpoint": pubSubEndpoint,
"sessionFilePath": sessionFilePath,
"filtrationMode": filtrationMode,
},
Environment: &dataflowpb.FlexTemplateRuntimeEnvironment{
NumWorkers: int32(orderingWorkers),
AdditionalExperiments: additionalExpr,
MachineType: machineType,
Network: vpcNetwork,
Subnetwork: vpcSubnetwork,
IpConfiguration: workerIpAddressConfig,
ServiceAccountEmail: serviceAccountEmail,
},
}
req := &dataflowpb.LaunchFlexTemplateRequest{
ProjectId: projectId,
LaunchParameter: launchParameters,
Location: dataflowRegion,
}
fmt.Printf("\nGCLOUD CMD FOR ORDERING JOB:\n%s\n\n", getGcloudCommand(req, ORDERING_TEMPLATE))
_, err = c.LaunchFlexTemplate(ctx, req)
if err != nil {
fmt.Printf("unable to launch ordering job: %v \n REQUEST BODY: %+v\n", err, req)
return
}
fmt.Println("Launched ordering job: ", fmt.Sprintf("%s-ordering", jobNamePrefix))
launchParameters = &dataflowpb.LaunchFlexTemplateParameter{
JobName: fmt.Sprintf("%s-writer", jobNamePrefix),
Template: &dataflowpb.LaunchFlexTemplateParameter_ContainerSpecGcsPath{ContainerSpecGcsPath: WRITER_TEMPLATE},
Parameters: map[string]string{
"sourceShardsFilePath": sourceShardsFilePath,
"sessionFilePath": sessionFilePath,
"bufferType": "pubsub",
"pubSubProjectId": projectId,
},
Environment: &dataflowpb.FlexTemplateRuntimeEnvironment{
NumWorkers: int32(writerWorkers),
AdditionalExperiments: additionalExpr,
MachineType: machineType,
Network: vpcNetwork,
Subnetwork: vpcSubnetwork,
IpConfiguration: workerIpAddressConfig,
ServiceAccountEmail: serviceAccountEmail,
},
}
req = &dataflowpb.LaunchFlexTemplateRequest{
ProjectId: projectId,
LaunchParameter: launchParameters,
Location: dataflowRegion,
}
fmt.Printf("\nGCLOUD CMD FOR WRITER JOB:\n%s\n\n", getGcloudCommand(req, WRITER_TEMPLATE))
_, err = c.LaunchFlexTemplate(ctx, req)
if err != nil {
fmt.Printf("unable to launch writer job: %v \n REQUEST BODY: %+v\n", err, req)
return
}
fmt.Println("Launched writer job: ", fmt.Sprintf("%s-writer", jobNamePrefix))
}
func verifySubscription(ctx context.Context, client *pubsub.Client, subName string) error {
subscription := client.Subscription(subName)
subCfg, err := subscription.Config(ctx)
if err != nil {
return fmt.Errorf("cannot fetch config for subscription %s", subName)
}
topicUri := subCfg.Topic.String()
if topicUri != fmt.Sprintf("projects/%s/topics/%s", projectId, pubSubDataTopicId) {
return fmt.Errorf("pubSubDataTopicId provided was %s, but existing subscription %s receives from %s. Please change pubSubDataTopicId or delete the subscription", pubSubDataTopicId, subName, topicUri)
}
if !subCfg.EnableMessageOrdering {
return fmt.Errorf("existing subscription %s has EnableMessageOrdering set to false. Pleasse update or delete the subscription", subName)
}
if !(strings.Contains(subCfg.Filter, fmt.Sprintf("attributes.shardId=\"%s\"", subName))) {
return fmt.Errorf("existing subscription %s does not have the correct filter. Please delete the subscription", subName)
}
return nil
}
func validateOrCreateChangeStream(ctx context.Context, adminClient *database.DatabaseAdminClient, spClient *spanner.Client, dbUri string) error {
q := `SELECT * FROM information_schema.change_streams`
stmt := spanner.Statement{
SQL: q,
}
iter := spClient.Single().Query(ctx, stmt)
defer iter.Stop()
var cs_catalog, cs_schema, cs_name string
var coversAll bool
csExists := false
for {
row, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("couldn't read row from change_streams table: %w", err)
}
err = row.Columns(&cs_catalog, &cs_schema, &cs_name, &coversAll)
if err != nil {
return fmt.Errorf("can't scan row from change_streams table: %v", err)
}
if cs_name == changeStreamName {
csExists = true
fmt.Printf("Found changestream %s\n", changeStreamName)
break
}
}
if !csExists {
fmt.Printf("changestream %s not found\n", changeStreamName)
err := createChangeStream(ctx, adminClient, dbUri)
if err != nil {
return fmt.Errorf("could not create changestream: %v", err)
}
return nil
}
q = `SELECT option_value FROM information_schema.change_stream_options WHERE change_stream_name = @p1 AND option_name = 'value_capture_type'`
stmt = spanner.Statement{
SQL: q,
Params: map[string]interface{}{
"p1": changeStreamName,
},
}
iter = spClient.Single().Query(ctx, stmt)
defer iter.Stop()
var option_value string
for {
row, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("couldn't read row from change_stream_options table: %w", err)
}
err = row.Columns(&option_value)
if err != nil {
return fmt.Errorf("can't scan row from change_stream_options table: %v", err)
}
if option_value != "NEW_ROW" {
return fmt.Errorf("VALUE_CAPTURE_TYPE for changestream %s is not NEW_ROW. Please update the changestream option or create a new one", changeStreamName)
}
}
if !coversAll {
fmt.Printf("\nWARNING: watching definition for the existing changestream %s is not 'ALL'."+
" This means only specific tables and columns are tracked."+
" Only the tables and columns watched by this changestream will get reverse replicated.\n\n", changeStreamName)
}
fmt.Println("Skipping changestream creation ...")
return nil
}
func createChangeStream(ctx context.Context, adminClient *database.DatabaseAdminClient, dbUri string) error {
fmt.Println("Creating changestream")
op, err := adminClient.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
Database: dbUri,
// TODO: create change stream for only the tables present in Spanner.
Statements: []string{fmt.Sprintf("CREATE CHANGE STREAM %s FOR ALL OPTIONS (value_capture_type = 'NEW_ROW')", changeStreamName)},
})
if err != nil {
return fmt.Errorf("Cannot submit request create change stream request: %v\n", err)
}
if err := op.Wait(ctx); err != nil {
return fmt.Errorf("Could not update database ddl: %v\n", err)
} else {
fmt.Println("Successfully created changestream", changeStreamName)
}
return nil
}
func getGcloudCommand(req *dataflowpb.LaunchFlexTemplateRequest, templatePath string) string {
lp := req.LaunchParameter
params := ""
for k, v := range lp.Parameters {
params = params + k + "=" + v + ","
}
params = strings.TrimSuffix(params, ",")
cmd := fmt.Sprintf("gcloud dataflow flex-template run %s --project=%s --region=%s --template-file-gcs-location=%s --parameters %s --num-workers=%d --worker-machine-type=%s",
lp.JobName, req.ProjectId, req.Location, templatePath, params, lp.Environment.NumWorkers, lp.Environment.MachineType)
if lp.Environment.AdditionalExperiments != nil {
exps := lp.Environment.AdditionalExperiments
experiments := strings.Join(exps[:], ",")
cmd += " --additional-experiments=" + experiments
}
return cmd
}