Skip to content

Commit

Permalink
flow/cmd: avoid context.Background, pass contexts down to queries (#1158
Browse files Browse the repository at this point in the history
)
  • Loading branch information
serprex authored Jan 25, 2024
1 parent 8adab3f commit 9e14094
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 22 deletions.
4 changes: 1 addition & 3 deletions flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
)

type APIServerParams struct {
ctx context.Context
Port uint16
GatewayPort uint16
TemporalHostPort string
Expand Down Expand Up @@ -88,8 +87,7 @@ func killExistingHeartbeatFlows(
return nil
}

func APIMain(args *APIServerParams) error {
ctx := args.ctx
func APIMain(ctx context.Context, args *APIServerParams) error {
clientOptions := client.Options{
HostPort: args.TemporalHostPort,
Namespace: args.TemporalNamespace,
Expand Down
20 changes: 9 additions & 11 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,7 @@ func (h *FlowRequestHandler) CreateCDCFlow(
}
}

var err error
err = h.updateFlowConfigInCatalog(cfg)
err := h.updateFlowConfigInCatalog(ctx, cfg)
if err != nil {
slog.Error("unable to update flow config in catalog", slog.Any("error", err))
return nil, fmt.Errorf("unable to update flow config in catalog: %w", err)
Expand All @@ -195,6 +194,7 @@ func (h *FlowRequestHandler) CreateCDCFlow(
}

func (h *FlowRequestHandler) updateFlowConfigInCatalog(
ctx context.Context,
cfg *protos.FlowConnectionConfigs,
) error {
var cfgBytes []byte
Expand All @@ -205,9 +205,7 @@ func (h *FlowRequestHandler) updateFlowConfigInCatalog(
return fmt.Errorf("unable to marshal flow config: %w", err)
}

_, err = h.pool.Exec(context.Background(),
"UPDATE flows SET config_proto = $1 WHERE name = $2",
cfgBytes, cfg.FlowJobName)
_, err = h.pool.Exec(ctx, "UPDATE flows SET config_proto = $1 WHERE name = $2", cfgBytes, cfg.FlowJobName)
if err != nil {
return fmt.Errorf("unable to update flow config in catalog: %w", err)
}
Expand All @@ -216,11 +214,10 @@ func (h *FlowRequestHandler) updateFlowConfigInCatalog(
}

func (h *FlowRequestHandler) removeFlowEntryInCatalog(
ctx context.Context,
flowName string,
) error {
_, err := h.pool.Exec(context.Background(),
"DELETE FROM flows WHERE name = $1",
flowName)
_, err := h.pool.Exec(ctx, "DELETE FROM flows WHERE name = $1", flowName)
if err != nil {
return fmt.Errorf("unable to remove flow entry in catalog: %w", err)
}
Expand Down Expand Up @@ -286,7 +283,7 @@ func (h *FlowRequestHandler) CreateQRepFlow(
return nil, fmt.Errorf("unable to start QRepFlow workflow: %w", err)
}

err = h.updateQRepConfigInCatalog(cfg)
err = h.updateQRepConfigInCatalog(ctx, cfg)
if err != nil {
slog.Error("unable to update qrep config in catalog",
slog.Any("error", err), slog.String("flowName", cfg.FlowJobName))
Expand All @@ -300,6 +297,7 @@ func (h *FlowRequestHandler) CreateQRepFlow(

// updateQRepConfigInCatalog updates the qrep config in the catalog
func (h *FlowRequestHandler) updateQRepConfigInCatalog(
ctx context.Context,
cfg *protos.QRepConfig,
) error {
var cfgBytes []byte
Expand All @@ -310,7 +308,7 @@ func (h *FlowRequestHandler) updateQRepConfigInCatalog(
return fmt.Errorf("unable to marshal qrep config: %w", err)
}

_, err = h.pool.Exec(context.Background(),
_, err = h.pool.Exec(ctx,
"UPDATE flows SET config_proto = $1 WHERE name = $2",
cfgBytes, cfg.FlowJobName)
if err != nil {
Expand Down Expand Up @@ -417,7 +415,7 @@ func (h *FlowRequestHandler) ShutdownFlow(
}

if req.RemoveFlowEntry {
delErr := h.removeFlowEntryInCatalog(req.FlowJobName)
delErr := h.removeFlowEntryInCatalog(ctx, req.FlowJobName)
if delErr != nil {
slog.Error("unable to remove flow job entry",
slog.String(string(shared.FlowNameKey), req.FlowJobName),
Expand Down
5 changes: 2 additions & 3 deletions flow/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ func main() {
Action: func(ctx context.Context, cmd *cli.Command) error {
temporalHostPort := cmd.String("temporal-host-port")

return APIMain(&APIServerParams{
ctx: appCtx,
return APIMain(ctx, &APIServerParams{
Port: uint16(cmd.Uint("port")),
TemporalHostPort: temporalHostPort,
GatewayPort: uint16(cmd.Uint("gateway-port")),
Expand All @@ -144,7 +143,7 @@ func main() {
},
}

if err := app.Run(context.Background(), os.Args); err != nil {
if err := app.Run(appCtx, os.Args); err != nil {
log.Fatalf("error running app: %v", err)
}
}
11 changes: 6 additions & 5 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (h *FlowRequestHandler) CDCFlowStatus(
req *protos.MirrorStatusRequest,
) (*protos.CDCMirrorStatus, error) {
slog.Info("CDC mirror status endpoint called", slog.String(string(shared.FlowNameKey), req.FlowJobName))
config, err := h.getFlowConfigFromCatalog(req.FlowJobName)
config, err := h.getFlowConfigFromCatalog(ctx, req.FlowJobName)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -207,7 +207,7 @@ func (h *FlowRequestHandler) QRepFlowStatus(
return &protos.QRepMirrorStatus{
// The clone table jobs that are children of the CDC snapshot flow
// do not have a config entry, so allow this to be nil.
Config: h.getQRepConfigFromCatalog(req.FlowJobName),
Config: h.getQRepConfigFromCatalog(ctx, req.FlowJobName),
Partitions: partitionStatuses,
}, nil
}
Expand Down Expand Up @@ -257,13 +257,14 @@ func (h *FlowRequestHandler) getPartitionStatuses(
}

func (h *FlowRequestHandler) getFlowConfigFromCatalog(
ctx context.Context,
flowJobName string,
) (*protos.FlowConnectionConfigs, error) {
var configBytes sql.RawBytes
var err error
var config protos.FlowConnectionConfigs

err = h.pool.QueryRow(context.Background(),
err = h.pool.QueryRow(ctx,
"SELECT config_proto FROM flows WHERE name = $1", flowJobName).Scan(&configBytes)
if err != nil {
slog.Error(fmt.Sprintf("unable to query flow config from catalog: %s", err.Error()))
Expand All @@ -279,7 +280,7 @@ func (h *FlowRequestHandler) getFlowConfigFromCatalog(
return &config, nil
}

func (h *FlowRequestHandler) getQRepConfigFromCatalog(flowJobName string) *protos.QRepConfig {
func (h *FlowRequestHandler) getQRepConfigFromCatalog(ctx context.Context, flowJobName string) *protos.QRepConfig {
var configBytes []byte
var config protos.QRepConfig

Expand All @@ -299,7 +300,7 @@ func (h *FlowRequestHandler) getQRepConfigFromCatalog(flowJobName string) *proto

// Iterate over queries and attempt to fetch the config
for _, qInfo := range queryInfos {
err := h.pool.QueryRow(context.Background(), qInfo.Query, flowJobName).Scan(&configBytes)
err := h.pool.QueryRow(ctx, qInfo.Query, flowJobName).Scan(&configBytes)
if err == nil {
break
}
Expand Down

0 comments on commit 9e14094

Please sign in to comment.