Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flow/cmd: avoid context.Background, pass contexts down to queries #1158

Merged
merged 1 commit into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
)

type APIServerParams struct {
ctx context.Context
Port uint16
GatewayPort uint16
TemporalHostPort string
Expand Down Expand Up @@ -88,8 +87,7 @@ func killExistingHeartbeatFlows(
return nil
}

func APIMain(args *APIServerParams) error {
ctx := args.ctx
func APIMain(ctx context.Context, args *APIServerParams) error {
clientOptions := client.Options{
HostPort: args.TemporalHostPort,
Namespace: args.TemporalNamespace,
Expand Down
20 changes: 9 additions & 11 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,7 @@ func (h *FlowRequestHandler) CreateCDCFlow(
}
}

var err error
err = h.updateFlowConfigInCatalog(cfg)
err := h.updateFlowConfigInCatalog(ctx, cfg)
if err != nil {
slog.Error("unable to update flow config in catalog", slog.Any("error", err))
return nil, fmt.Errorf("unable to update flow config in catalog: %w", err)
Expand All @@ -195,6 +194,7 @@ func (h *FlowRequestHandler) CreateCDCFlow(
}

func (h *FlowRequestHandler) updateFlowConfigInCatalog(
ctx context.Context,
cfg *protos.FlowConnectionConfigs,
) error {
var cfgBytes []byte
Expand All @@ -205,9 +205,7 @@ func (h *FlowRequestHandler) updateFlowConfigInCatalog(
return fmt.Errorf("unable to marshal flow config: %w", err)
}

_, err = h.pool.Exec(context.Background(),
"UPDATE flows SET config_proto = $1 WHERE name = $2",
cfgBytes, cfg.FlowJobName)
_, err = h.pool.Exec(ctx, "UPDATE flows SET config_proto = $1 WHERE name = $2", cfgBytes, cfg.FlowJobName)
if err != nil {
return fmt.Errorf("unable to update flow config in catalog: %w", err)
}
Expand All @@ -216,11 +214,10 @@ func (h *FlowRequestHandler) updateFlowConfigInCatalog(
}

func (h *FlowRequestHandler) removeFlowEntryInCatalog(
ctx context.Context,
flowName string,
) error {
_, err := h.pool.Exec(context.Background(),
"DELETE FROM flows WHERE name = $1",
flowName)
_, err := h.pool.Exec(ctx, "DELETE FROM flows WHERE name = $1", flowName)
if err != nil {
return fmt.Errorf("unable to remove flow entry in catalog: %w", err)
}
Expand Down Expand Up @@ -286,7 +283,7 @@ func (h *FlowRequestHandler) CreateQRepFlow(
return nil, fmt.Errorf("unable to start QRepFlow workflow: %w", err)
}

err = h.updateQRepConfigInCatalog(cfg)
err = h.updateQRepConfigInCatalog(ctx, cfg)
if err != nil {
slog.Error("unable to update qrep config in catalog",
slog.Any("error", err), slog.String("flowName", cfg.FlowJobName))
Expand All @@ -300,6 +297,7 @@ func (h *FlowRequestHandler) CreateQRepFlow(

// updateQRepConfigInCatalog updates the qrep config in the catalog
func (h *FlowRequestHandler) updateQRepConfigInCatalog(
ctx context.Context,
cfg *protos.QRepConfig,
) error {
var cfgBytes []byte
Expand All @@ -310,7 +308,7 @@ func (h *FlowRequestHandler) updateQRepConfigInCatalog(
return fmt.Errorf("unable to marshal qrep config: %w", err)
}

_, err = h.pool.Exec(context.Background(),
_, err = h.pool.Exec(ctx,
"UPDATE flows SET config_proto = $1 WHERE name = $2",
cfgBytes, cfg.FlowJobName)
if err != nil {
Expand Down Expand Up @@ -417,7 +415,7 @@ func (h *FlowRequestHandler) ShutdownFlow(
}

if req.RemoveFlowEntry {
delErr := h.removeFlowEntryInCatalog(req.FlowJobName)
delErr := h.removeFlowEntryInCatalog(ctx, req.FlowJobName)
if delErr != nil {
slog.Error("unable to remove flow job entry",
slog.String(string(shared.FlowNameKey), req.FlowJobName),
Expand Down
5 changes: 2 additions & 3 deletions flow/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ func main() {
Action: func(ctx context.Context, cmd *cli.Command) error {
temporalHostPort := cmd.String("temporal-host-port")

return APIMain(&APIServerParams{
ctx: appCtx,
return APIMain(ctx, &APIServerParams{
Port: uint16(cmd.Uint("port")),
TemporalHostPort: temporalHostPort,
GatewayPort: uint16(cmd.Uint("gateway-port")),
Expand All @@ -144,7 +143,7 @@ func main() {
},
}

if err := app.Run(context.Background(), os.Args); err != nil {
if err := app.Run(appCtx, os.Args); err != nil {
log.Fatalf("error running app: %v", err)
}
}
11 changes: 6 additions & 5 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (h *FlowRequestHandler) CDCFlowStatus(
req *protos.MirrorStatusRequest,
) (*protos.CDCMirrorStatus, error) {
slog.Info("CDC mirror status endpoint called", slog.String(string(shared.FlowNameKey), req.FlowJobName))
config, err := h.getFlowConfigFromCatalog(req.FlowJobName)
config, err := h.getFlowConfigFromCatalog(ctx, req.FlowJobName)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -207,7 +207,7 @@ func (h *FlowRequestHandler) QRepFlowStatus(
return &protos.QRepMirrorStatus{
// The clone table jobs that are children of the CDC snapshot flow
// do not have a config entry, so allow this to be nil.
Config: h.getQRepConfigFromCatalog(req.FlowJobName),
Config: h.getQRepConfigFromCatalog(ctx, req.FlowJobName),
Partitions: partitionStatuses,
}, nil
}
Expand Down Expand Up @@ -257,13 +257,14 @@ func (h *FlowRequestHandler) getPartitionStatuses(
}

func (h *FlowRequestHandler) getFlowConfigFromCatalog(
ctx context.Context,
flowJobName string,
) (*protos.FlowConnectionConfigs, error) {
var configBytes sql.RawBytes
var err error
var config protos.FlowConnectionConfigs

err = h.pool.QueryRow(context.Background(),
err = h.pool.QueryRow(ctx,
"SELECT config_proto FROM flows WHERE name = $1", flowJobName).Scan(&configBytes)
if err != nil {
slog.Error(fmt.Sprintf("unable to query flow config from catalog: %s", err.Error()))
Expand All @@ -279,7 +280,7 @@ func (h *FlowRequestHandler) getFlowConfigFromCatalog(
return &config, nil
}

func (h *FlowRequestHandler) getQRepConfigFromCatalog(flowJobName string) *protos.QRepConfig {
func (h *FlowRequestHandler) getQRepConfigFromCatalog(ctx context.Context, flowJobName string) *protos.QRepConfig {
var configBytes []byte
var config protos.QRepConfig

Expand All @@ -299,7 +300,7 @@ func (h *FlowRequestHandler) getQRepConfigFromCatalog(flowJobName string) *proto

// Iterate over queries and attempt to fetch the config
for _, qInfo := range queryInfos {
err := h.pool.QueryRow(context.Background(), qInfo.Query, flowJobName).Scan(&configBytes)
err := h.pool.QueryRow(ctx, qInfo.Query, flowJobName).Scan(&configBytes)
if err == nil {
break
}
Expand Down
Loading