diff --git a/flow/cmd/api.go b/flow/cmd/api.go index 1e6d3c6d17..09bfb4c5df 100644 --- a/flow/cmd/api.go +++ b/flow/cmd/api.go @@ -27,7 +27,6 @@ import ( ) type APIServerParams struct { - ctx context.Context Port uint16 GatewayPort uint16 TemporalHostPort string @@ -88,8 +87,7 @@ func killExistingHeartbeatFlows( return nil } -func APIMain(args *APIServerParams) error { - ctx := args.ctx +func APIMain(ctx context.Context, args *APIServerParams) error { clientOptions := client.Options{ HostPort: args.TemporalHostPort, Namespace: args.TemporalNamespace, diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 93d5ff1ea6..c5c930c143 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -169,8 +169,7 @@ func (h *FlowRequestHandler) CreateCDCFlow( } } - var err error - err = h.updateFlowConfigInCatalog(cfg) + err := h.updateFlowConfigInCatalog(ctx, cfg) if err != nil { slog.Error("unable to update flow config in catalog", slog.Any("error", err)) return nil, fmt.Errorf("unable to update flow config in catalog: %w", err) @@ -195,6 +194,7 @@ func (h *FlowRequestHandler) CreateCDCFlow( } func (h *FlowRequestHandler) updateFlowConfigInCatalog( + ctx context.Context, cfg *protos.FlowConnectionConfigs, ) error { var cfgBytes []byte @@ -205,9 +205,7 @@ func (h *FlowRequestHandler) updateFlowConfigInCatalog( return fmt.Errorf("unable to marshal flow config: %w", err) } - _, err = h.pool.Exec(context.Background(), - "UPDATE flows SET config_proto = $1 WHERE name = $2", - cfgBytes, cfg.FlowJobName) + _, err = h.pool.Exec(ctx, "UPDATE flows SET config_proto = $1 WHERE name = $2", cfgBytes, cfg.FlowJobName) if err != nil { return fmt.Errorf("unable to update flow config in catalog: %w", err) } @@ -216,11 +214,10 @@ func (h *FlowRequestHandler) updateFlowConfigInCatalog( } func (h *FlowRequestHandler) removeFlowEntryInCatalog( + ctx context.Context, flowName string, ) error { - _, err := h.pool.Exec(context.Background(), - "DELETE FROM flows WHERE name = $1", - flowName) + _, err := h.pool.Exec(ctx, "DELETE FROM flows WHERE name = $1", flowName) if err != nil { return fmt.Errorf("unable to remove flow entry in catalog: %w", err) } @@ -286,7 +283,7 @@ func (h *FlowRequestHandler) CreateQRepFlow( return nil, fmt.Errorf("unable to start QRepFlow workflow: %w", err) } - err = h.updateQRepConfigInCatalog(cfg) + err = h.updateQRepConfigInCatalog(ctx, cfg) if err != nil { slog.Error("unable to update qrep config in catalog", slog.Any("error", err), slog.String("flowName", cfg.FlowJobName)) @@ -300,6 +297,7 @@ func (h *FlowRequestHandler) CreateQRepFlow( // updateQRepConfigInCatalog updates the qrep config in the catalog func (h *FlowRequestHandler) updateQRepConfigInCatalog( + ctx context.Context, cfg *protos.QRepConfig, ) error { var cfgBytes []byte @@ -310,7 +308,7 @@ func (h *FlowRequestHandler) updateQRepConfigInCatalog( return fmt.Errorf("unable to marshal qrep config: %w", err) } - _, err = h.pool.Exec(context.Background(), + _, err = h.pool.Exec(ctx, "UPDATE flows SET config_proto = $1 WHERE name = $2", cfgBytes, cfg.FlowJobName) if err != nil { @@ -417,7 +415,7 @@ func (h *FlowRequestHandler) ShutdownFlow( } if req.RemoveFlowEntry { - delErr := h.removeFlowEntryInCatalog(req.FlowJobName) + delErr := h.removeFlowEntryInCatalog(ctx, req.FlowJobName) if delErr != nil { slog.Error("unable to remove flow job entry", slog.String(string(shared.FlowNameKey), req.FlowJobName), diff --git a/flow/cmd/main.go b/flow/cmd/main.go index 1d924e3ccc..d6d23ff1ab 100644 --- a/flow/cmd/main.go +++ b/flow/cmd/main.go @@ -130,8 +130,7 @@ func main() { Action: func(ctx context.Context, cmd *cli.Command) error { temporalHostPort := cmd.String("temporal-host-port") - return APIMain(&APIServerParams{ - ctx: appCtx, + return APIMain(ctx, &APIServerParams{ Port: uint16(cmd.Uint("port")), TemporalHostPort: temporalHostPort, GatewayPort: uint16(cmd.Uint("gateway-port")), @@ -144,7 +143,7 @@ func main() { }, } - if err := app.Run(context.Background(), os.Args); err != nil { + if err := app.Run(appCtx, os.Args); err != nil { log.Fatalf("error running app: %v", err) } } diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index df7862d8fe..ff14f04f35 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -77,7 +77,7 @@ func (h *FlowRequestHandler) CDCFlowStatus( req *protos.MirrorStatusRequest, ) (*protos.CDCMirrorStatus, error) { slog.Info("CDC mirror status endpoint called", slog.String(string(shared.FlowNameKey), req.FlowJobName)) - config, err := h.getFlowConfigFromCatalog(req.FlowJobName) + config, err := h.getFlowConfigFromCatalog(ctx, req.FlowJobName) if err != nil { return nil, err } @@ -207,7 +207,7 @@ func (h *FlowRequestHandler) QRepFlowStatus( return &protos.QRepMirrorStatus{ // The clone table jobs that are children of the CDC snapshot flow // do not have a config entry, so allow this to be nil. - Config: h.getQRepConfigFromCatalog(req.FlowJobName), + Config: h.getQRepConfigFromCatalog(ctx, req.FlowJobName), Partitions: partitionStatuses, }, nil } @@ -257,13 +257,14 @@ func (h *FlowRequestHandler) getPartitionStatuses( } func (h *FlowRequestHandler) getFlowConfigFromCatalog( + ctx context.Context, flowJobName string, ) (*protos.FlowConnectionConfigs, error) { var configBytes sql.RawBytes var err error var config protos.FlowConnectionConfigs - err = h.pool.QueryRow(context.Background(), + err = h.pool.QueryRow(ctx, "SELECT config_proto FROM flows WHERE name = $1", flowJobName).Scan(&configBytes) if err != nil { slog.Error(fmt.Sprintf("unable to query flow config from catalog: %s", err.Error())) @@ -279,7 +280,7 @@ func (h *FlowRequestHandler) getFlowConfigFromCatalog( return &config, nil } -func (h *FlowRequestHandler) getQRepConfigFromCatalog(flowJobName string) *protos.QRepConfig { +func (h *FlowRequestHandler) getQRepConfigFromCatalog(ctx context.Context, flowJobName string) *protos.QRepConfig { var configBytes []byte var config protos.QRepConfig @@ -299,7 +300,7 @@ func (h *FlowRequestHandler) getQRepConfigFromCatalog(flowJobName string) *proto // Iterate over queries and attempt to fetch the config for _, qInfo := range queryInfos { - err := h.pool.QueryRow(context.Background(), qInfo.Query, flowJobName).Scan(&configBytes) + err := h.pool.QueryRow(ctx, qInfo.Query, flowJobName).Scan(&configBytes) if err == nil { break }