Skip to content

Commit a6e7930

Browse files
authored
Merge branch 'main' into distinct-flow-names
2 parents 761886a + 2271a1b commit a6e7930

File tree

11 files changed

+206
-31
lines changed

11 files changed

+206
-31
lines changed

flow/activities/snapshot_activity.go

+3
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName s
3232
connectors.CloseConnector(ctx, s.connector)
3333
delete(a.SnapshotConnections, flowJobName)
3434
}
35+
a.Alerter.LogFlowEvent(ctx, flowJobName, "Ended Snapshot Flow Job - "+flowJobName)
3536

3637
return nil
3738
}
@@ -49,6 +50,8 @@ func (a *SnapshotActivity) SetupReplication(
4950
return nil, nil
5051
}
5152

53+
a.Alerter.LogFlowEvent(ctx, config.FlowJobName, "Started Snapshot Flow Job - "+config.FlowJobName)
54+
5255
conn, err := connectors.GetCDCPullConnector(ctx, config.PeerConnectionConfig)
5356
if err != nil {
5457
return nil, fmt.Errorf("failed to get connector: %w", err)

flow/alerting/alerting.go

+39-4
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@ import (
1414
"github.com/PeerDB-io/peer-flow/generated/protos"
1515
"github.com/PeerDB-io/peer-flow/logger"
1616
"github.com/PeerDB-io/peer-flow/peerdbenv"
17+
"github.com/PeerDB-io/peer-flow/shared/telemetry"
1718
)
1819

1920
// alerting service, no cool name :(
2021
type Alerter struct {
21-
catalogPool *pgxpool.Pool
22+
catalogPool *pgxpool.Pool
23+
telemetrySender telemetry.Sender
2224
}
2325

2426
func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]*slackAlertSender, error) {
@@ -50,13 +52,25 @@ func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]*slackAlertSen
5052
}
5153

5254
// doesn't take care of closing pool, needs to be done externally.
53-
func NewAlerter(catalogPool *pgxpool.Pool) *Alerter {
55+
func NewAlerter(ctx context.Context, catalogPool *pgxpool.Pool) *Alerter {
5456
if catalogPool == nil {
5557
panic("catalog pool is nil for Alerter")
5658
}
57-
59+
snsTopic := peerdbenv.PeerDBTelemetryAWSSNSTopicArn()
60+
var snsMessageSender telemetry.Sender
61+
if snsTopic != "" {
62+
var err error
63+
snsMessageSender, err = telemetry.NewSNSMessageSenderWithNewClient(ctx, &telemetry.SNSMessageSenderConfig{
64+
Topic: snsTopic,
65+
})
66+
logger.LoggerFromCtx(ctx).Info("Successfully registered telemetry sender")
67+
if err != nil {
68+
panic(fmt.Sprintf("unable to setup telemetry is nil for Alerter %+v", err))
69+
}
70+
}
5871
return &Alerter{
59-
catalogPool: catalogPool,
72+
catalogPool: catalogPool,
73+
telemetrySender: snsMessageSender,
6074
}
6175
}
6276

@@ -193,6 +207,22 @@ func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertKey string
193207
return false
194208
}
195209

210+
func (a *Alerter) sendTelemetryMessage(ctx context.Context, flowName string, more string, level telemetry.Level) {
211+
if a.telemetrySender != nil {
212+
details := fmt.Sprintf("[%s] %s", flowName, more)
213+
_, err := a.telemetrySender.SendMessage(ctx, details, details, telemetry.Attributes{
214+
Level: level,
215+
DeploymentUID: peerdbenv.PeerDBDeploymentUID(),
216+
Tags: []string{flowName},
217+
Type: flowName,
218+
})
219+
if err != nil {
220+
logger.LoggerFromCtx(ctx).Warn("failed to send message to telemetrySender", slog.Any("error", err))
221+
return
222+
}
223+
}
224+
}
225+
196226
func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error) {
197227
errorWithStack := fmt.Sprintf("%+v", err)
198228
_, err = a.catalogPool.Exec(ctx,
@@ -202,6 +232,11 @@ func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error)
202232
logger.LoggerFromCtx(ctx).Warn("failed to insert flow error", slog.Any("error", err))
203233
return
204234
}
235+
a.sendTelemetryMessage(ctx, flowName, errorWithStack, telemetry.ERROR)
236+
}
237+
238+
func (a *Alerter) LogFlowEvent(ctx context.Context, flowName string, info string) {
239+
a.sendTelemetryMessage(ctx, flowName, info, telemetry.INFO)
205240
}
206241

207242
func (a *Alerter) LogFlowInfo(ctx context.Context, flowName string, info string) {

flow/cmd/snapshot_worker.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) (client.Client, worker.Work
6868
w.RegisterWorkflow(peerflow.SnapshotFlowWorkflow)
6969
w.RegisterActivity(&activities.SnapshotActivity{
7070
SnapshotConnections: make(map[string]activities.SlotSnapshotSignal),
71-
Alerter: alerting.NewAlerter(conn),
71+
Alerter: alerting.NewAlerter(context.Background(), conn),
7272
})
7373

7474
return c, w, nil

flow/cmd/worker.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ func WorkerMain(opts *WorkerOptions) (client.Client, worker.Worker, error) {
120120

121121
w.RegisterActivity(&activities.FlowableActivity{
122122
CatalogPool: conn,
123-
Alerter: alerting.NewAlerter(conn),
123+
Alerter: alerting.NewAlerter(context.Background(), conn),
124124
CdcCache: make(map[string]connectors.CDCPullConnector),
125125
})
126126

flow/go.mod

+12-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/PeerDB-io/peer-flow
22

3-
go 1.22
3+
go 1.22.0
44

55
require (
66
cloud.google.com/go v0.112.0
@@ -10,10 +10,12 @@ require (
1010
github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.0.3
1111
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.2.0
1212
github.com/ClickHouse/clickhouse-go/v2 v2.18.0
13-
github.com/aws/aws-sdk-go-v2 v1.25.0
13+
github.com/aws/aws-sdk-go-v2 v1.25.2
14+
github.com/aws/aws-sdk-go-v2/config v1.27.0
1415
github.com/aws/aws-sdk-go-v2/credentials v1.17.0
1516
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1
1617
github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0
18+
github.com/aws/aws-sdk-go-v2/service/sns v1.29.1
1719
github.com/cockroachdb/pebble v1.1.0
1820
github.com/google/uuid v1.6.0
1921
github.com/grafana/pyroscope-go v1.1.1
@@ -52,6 +54,11 @@ require (
5254
github.com/DataDog/zstd v1.5.5 // indirect
5355
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
5456
github.com/apache/arrow/go/v14 v14.0.2 // indirect
57+
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0 // indirect
58+
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect
59+
github.com/aws/aws-sdk-go-v2/service/sso v1.19.0 // indirect
60+
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.22.0 // indirect
61+
github.com/aws/aws-sdk-go-v2/service/sts v1.27.0 // indirect
5562
github.com/beorn7/perks v1.0.1 // indirect
5663
github.com/cespare/xxhash/v2 v2.2.0 // indirect
5764
github.com/cockroachdb/errors v1.11.1 // indirect
@@ -103,14 +110,14 @@ require (
103110
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.1 // indirect
104111
github.com/andybalholm/brotli v1.1.0 // indirect
105112
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0 // indirect
106-
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0 // indirect
107-
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0 // indirect
113+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2 // indirect
114+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2 // indirect
108115
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.0 // indirect
109116
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.0 // indirect
110117
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.0 // indirect
111118
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.0 // indirect
112119
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0 // indirect
113-
github.com/aws/smithy-go v1.20.0 // indirect
120+
github.com/aws/smithy-go v1.20.1 // indirect
114121
github.com/davecgh/go-spew v1.1.1 // indirect
115122
github.com/djherbis/buffer v1.2.0
116123
github.com/djherbis/nio/v3 v3.0.1

flow/go.sum

+10-8
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer5
6464
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
6565
github.com/apache/arrow/go/v14 v14.0.2 h1:N8OkaJEOfI3mEZt07BIkvo4sC6XDbL+48MBPWO5IONw=
6666
github.com/apache/arrow/go/v14 v14.0.2/go.mod h1:u3fgh3EdgN/YQ8cVQRguVW3R+seMybFg8QBQ5LU+eBY=
67-
github.com/aws/aws-sdk-go-v2 v1.25.0 h1:sv7+1JVJxOu/dD/sz/csHX7jFqmP001TIY7aytBWDSQ=
68-
github.com/aws/aws-sdk-go-v2 v1.25.0/go.mod h1:G104G1Aho5WqF+SR3mDIobTABQzpYV0WxMsKxlMggOA=
67+
github.com/aws/aws-sdk-go-v2 v1.25.2 h1:/uiG1avJRgLGiQM9X3qJM8+Qa6KRGK5rRPuXE0HUM+w=
68+
github.com/aws/aws-sdk-go-v2 v1.25.2/go.mod h1:Evoc5AsmtveRt1komDwIsjHFyrP5tDuF1D1U+6z6pNo=
6969
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0 h1:2UO6/nT1lCZq1LqM67Oa4tdgP1CvL1sLSxvuD+VrOeE=
7070
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0/go.mod h1:5zGj2eA85ClyedTDK+Whsu+w9yimnVIZvhvBKrDquM8=
7171
github.com/aws/aws-sdk-go-v2/config v1.27.0 h1:J5sdGCAHuWKIXLeXiqr8II/adSvetkx0qdZwdbXXpb0=
@@ -76,10 +76,10 @@ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0 h1:xWCwjjvVz2ojYTP4kBKUuUh
7676
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.15.0/go.mod h1:j3fACuqXg4oMTQOR2yY7m0NmJY0yBK4L4sLsRXq1Ins=
7777
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1 h1:FqtJUSBgT2yfZ8kZhTi9AO131qMLOzb4MiH4riAM8XM=
7878
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.1/go.mod h1:G3V4qNUPMHKrXW/l149QXmHjf1vlMWBO4UuGPCK4a/c=
79-
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0 h1:NPs/EqVO+ajwOoq56EfcGKa3L3ruWuazkIw1BqxwOPw=
80-
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0/go.mod h1:D+duLy2ylgatV+yTlQ8JTuLfDD0BnFvnQRc+o6tbZ4M=
81-
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0 h1:ks7KGMVUMoDzcxNWUlEdI+/lokMFD136EL6DWmUOV80=
82-
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0/go.mod h1:hL6BWM/d/qz113fVitZjbXR0E+RCTU1+x+1Idyn5NgE=
79+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2 h1:bNo4LagzUKbjdxE0tIcR9pMzLR2U/Tgie1Hq1HQ3iH8=
80+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2/go.mod h1:wRQv0nN6v9wDXuWThpovGQjqF1HFdcgWjporw14lS8k=
81+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2 h1:EtOU5jsPdIQNP+6Q2C5e3d65NKT1PeCiQk+9OdzO12Q=
82+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.2/go.mod h1:tyF5sKccmDz0Bv4NrstEr+/9YkSPJHrcO7UsUKf7pWM=
8383
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU=
8484
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY=
8585
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.0 h1:TkbRExyKSVHELwG9gz2+gql37jjec2R5vus9faTomwE=
@@ -94,14 +94,16 @@ github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0 h1:l5puwOHr7IxECu
9494
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.0/go.mod h1:Oov79flWa/n7Ni+lQC3z+VM7PoRM47omRqbJU9B5Y7E=
9595
github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0 h1:jZAdMD1ioZdqirzzVVRhpHHWJmcGGCn8JqDYBs5nmYA=
9696
github.com/aws/aws-sdk-go-v2/service/s3 v1.50.0/go.mod h1:1o/W6JFUuREj2ExoQ21vHJgO7wakvjhol91M9eknFgs=
97+
github.com/aws/aws-sdk-go-v2/service/sns v1.29.1 h1:K2FiR/547lI9vGuDL0Ghin4QPSEvOKxbHY9aXFq8wfU=
98+
github.com/aws/aws-sdk-go-v2/service/sns v1.29.1/go.mod h1:PBmfgVv83oBgZVFhs/+oWsL6r0hLyB6qHRFEWwHyHn4=
9799
github.com/aws/aws-sdk-go-v2/service/sso v1.19.0 h1:u6OkVDxtBPnxPkZ9/63ynEe+8kHbtS5IfaC4PzVxzWM=
98100
github.com/aws/aws-sdk-go-v2/service/sso v1.19.0/go.mod h1:YqbU3RS/pkDVu+v+Nwxvn0i1WB0HkNWEePWbmODEbbs=
99101
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.22.0 h1:6DL0qu5+315wbsAEEmzK+P9leRwNbkp+lGjPC+CEvb8=
100102
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.22.0/go.mod h1:olUAyg+FaoFaL/zFaeQQONjOZ9HXoxgvI/c7mQTYz7M=
101103
github.com/aws/aws-sdk-go-v2/service/sts v1.27.0 h1:cjTRjh700H36MQ8M0LnDn33W3JmwC77mdxIIyPWCdpM=
102104
github.com/aws/aws-sdk-go-v2/service/sts v1.27.0/go.mod h1:nXfOBMWPokIbOY+Gi7a1psWMSvskUCemZzI+SMB7Akc=
103-
github.com/aws/smithy-go v1.20.0 h1:6+kZsCXZwKxZS9RfISnPc4EXlHoyAkm2hPuM8X2BrrQ=
104-
github.com/aws/smithy-go v1.20.0/go.mod h1:uo5RKksAl4PzhqaAbjd4rLgFoq5koTsQKYuGe7dklGc=
105+
github.com/aws/smithy-go v1.20.1 h1:4SZlSlMr36UEqC7XOyRVb27XMeZubNcBNN+9IgEPIQw=
106+
github.com/aws/smithy-go v1.20.1/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E=
105107
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
106108
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
107109
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=

flow/model/signals.go

-4
Original file line numberDiff line numberDiff line change
@@ -138,10 +138,6 @@ var SyncResultSignal = TypedSignal[*SyncResponse]{
138138
Name: "sync-result",
139139
}
140140

141-
var SyncOptionsSignal = TypedSignal[*protos.SyncFlowOptions]{
142-
Name: "sync-options",
143-
}
144-
145141
var NormalizeSignal = TypedSignal[NormalizePayload]{
146142
Name: "normalize",
147143
}

flow/peerdbenv/config.go

+5
Original file line numberDiff line numberDiff line change
@@ -90,3 +90,8 @@ func PeerDBEnableWALHeartbeat() bool {
9090
func PeerDBEnableParallelSyncNormalize() bool {
9191
return getEnvBool("PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE", false)
9292
}
93+
94+
// PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN
95+
func PeerDBTelemetryAWSSNSTopicArn() string {
96+
return getEnvString("PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN", "")
97+
}

flow/shared/telemetry/interface.go

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package telemetry
2+
3+
import (
4+
"context"
5+
)
6+
7+
type Sender interface {
8+
SendMessage(ctx context.Context, subject string, body string, attributes Attributes) (*string, error)
9+
}
10+
11+
type Attributes struct {
12+
Level Level
13+
DeploymentUID string
14+
Tags []string
15+
Type string
16+
}
17+
18+
type Level string
19+
20+
const (
21+
INFO Level = "INFO"
22+
WARN Level = "WARN"
23+
ERROR Level = "ERROR"
24+
CRITICAL Level = "CRITICAL"
25+
)
+110
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package telemetry
2+
3+
import (
4+
"context"
5+
"crypto/sha256"
6+
"encoding/hex"
7+
"strings"
8+
9+
"github.com/aws/aws-sdk-go-v2/aws"
10+
"github.com/aws/aws-sdk-go-v2/config"
11+
"github.com/aws/aws-sdk-go-v2/service/sns"
12+
"github.com/aws/aws-sdk-go-v2/service/sns/types"
13+
"go.temporal.io/sdk/activity"
14+
)
15+
16+
type SNSMessageSender interface {
17+
Sender
18+
}
19+
20+
type SNSMessageSenderImpl struct {
21+
client *sns.Client
22+
topic string
23+
}
24+
25+
type SNSMessageSenderConfig struct {
26+
Topic string `json:"topic"`
27+
}
28+
29+
func (s *SNSMessageSenderImpl) SendMessage(ctx context.Context, subject string, body string, attributes Attributes) (*string, error) {
30+
activityInfo := activity.GetInfo(ctx)
31+
deduplicationString := strings.Join([]string{
32+
"deployID", attributes.DeploymentUID,
33+
"subject", subject,
34+
"runID", activityInfo.WorkflowExecution.RunID,
35+
"activityName", activityInfo.ActivityType.Name,
36+
}, " || ")
37+
h := sha256.New()
38+
h.Write([]byte(deduplicationString))
39+
deduplicationHash := hex.EncodeToString(h.Sum(nil))
40+
41+
publish, err := s.client.Publish(ctx, &sns.PublishInput{
42+
Message: aws.String(body),
43+
MessageAttributes: map[string]types.MessageAttributeValue{
44+
"level": {
45+
DataType: aws.String("String"),
46+
StringValue: aws.String(string(attributes.Level)),
47+
},
48+
"tags": {
49+
DataType: aws.String("String"),
50+
StringValue: aws.String(strings.Join(attributes.Tags, ",")),
51+
},
52+
"deploymentUUID": {
53+
DataType: aws.String("String"),
54+
StringValue: aws.String(attributes.DeploymentUID),
55+
},
56+
"entity": {
57+
DataType: aws.String("String"),
58+
StringValue: aws.String(attributes.DeploymentUID),
59+
},
60+
"type": {
61+
DataType: aws.String("String"),
62+
StringValue: aws.String(attributes.Type),
63+
},
64+
"alias": { // This will act as a de-duplication ID
65+
DataType: aws.String("String"),
66+
StringValue: aws.String(deduplicationHash),
67+
},
68+
},
69+
Subject: aws.String(subject[:100]),
70+
TopicArn: aws.String(s.topic),
71+
})
72+
if err != nil {
73+
return nil, err
74+
}
75+
return publish.MessageId, nil
76+
}
77+
78+
func NewSNSMessageSenderWithNewClient(ctx context.Context, config *SNSMessageSenderConfig) (SNSMessageSender, error) {
79+
// Topic Region must match client region
80+
region := strings.Split(strings.TrimPrefix(config.Topic, "arn:aws:sns:"), ":")[0]
81+
client, err := newSnsClient(ctx, &region)
82+
if err != nil {
83+
return nil, err
84+
}
85+
return &SNSMessageSenderImpl{
86+
client: client,
87+
topic: config.Topic,
88+
}, nil
89+
}
90+
91+
func NewSNSMessageSender(client *sns.Client, config *SNSMessageSenderConfig) SNSMessageSender {
92+
return &SNSMessageSenderImpl{
93+
client: client,
94+
topic: config.Topic,
95+
}
96+
}
97+
98+
func newSnsClient(ctx context.Context, region *string) (*sns.Client, error) {
99+
sdkConfig, err := config.LoadDefaultConfig(ctx, func(options *config.LoadOptions) error {
100+
if region != nil {
101+
options.Region = *region
102+
}
103+
return nil
104+
})
105+
if err != nil {
106+
return nil, err
107+
}
108+
snsClient := sns.NewFromConfig(sdkConfig)
109+
return snsClient, nil
110+
}

flow/workflows/cdc_flow.go

-8
Original file line numberDiff line numberDiff line change
@@ -164,10 +164,6 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdate(ctx workflow.Conte
164164

165165
state.SyncFlowOptions.TableMappings = append(state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables...)
166166

167-
if w.syncFlowFuture != nil {
168-
_ = model.SyncOptionsSignal.SignalChildWorkflow(ctx, w.syncFlowFuture, state.SyncFlowOptions).Get(ctx, nil)
169-
}
170-
171167
// finished processing, wipe it
172168
state.FlowConfigUpdate = nil
173169
}
@@ -191,10 +187,6 @@ func (w *CDCFlowWorkflowExecution) addCdcPropertiesSignalListener(
191187
// do this irrespective of additional tables being present, for auto unpausing
192188
state.FlowConfigUpdate = cdcConfigUpdate
193189

194-
if w.syncFlowFuture != nil {
195-
_ = model.SyncOptionsSignal.SignalChildWorkflow(ctx, w.syncFlowFuture, state.SyncFlowOptions).Get(ctx, nil)
196-
}
197-
198190
w.logger.Info("CDC Signal received. Parameters on signal reception:",
199191
slog.Int("BatchSize", int(state.SyncFlowOptions.BatchSize)),
200192
slog.Int("IdleTimeout", int(state.SyncFlowOptions.IdleTimeoutSeconds)),

0 commit comments

Comments
 (0)