Skip to content

Commit

Permalink
suggested changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Nov 24, 2023
1 parent 633302f commit 89bbbe8
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 5 deletions.
6 changes: 3 additions & 3 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,15 +663,15 @@ func (a *FlowableActivity) DropFlow(ctx context.Context, config *protos.Shutdown
return nil
}

func GetPostgresPeerConfigs(ctx context.Context) ([]*protos.PostgresConfig, error) {
func getPostgresPeerConfigs(ctx context.Context) ([]*protos.PostgresConfig, error) {
var peerOptions sql.RawBytes
catalogPool, catalogErr := catalog.GetCatalogConnectionPoolFromEnv()
if catalogErr != nil {
return nil, fmt.Errorf("error getting catalog connection pool: %w", catalogErr)
}
defer catalogPool.Close()

optionRows, err := catalogPool.Query(ctx, "SELECT options FROM peers WHERE type=3")
optionRows, err := catalogPool.Query(ctx, "SELECT options FROM peers WHERE type=$1", protos.DBType_POSTGRES)
if err != nil {
return nil, err
}
Expand All @@ -697,7 +697,7 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
ticker := time.NewTicker(sendTimeout)
defer ticker.Stop()

pgConfigs, err := GetPostgresPeerConfigs(ctx)
pgConfigs, err := getPostgresPeerConfigs(ctx)
if err != nil {
return fmt.Errorf("error getting postgres peer configs: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func setupGRPCGatewayServer(args *APIServerParams) (*http.Server, error) {
return server, nil
}

func KillExistingHeartbeatFlows(ctx context.Context, tc client.Client, namespace string) error {
func killExistingHeartbeatFlows(ctx context.Context, tc client.Client, namespace string) error {
listRes, err := tc.ListWorkflow(ctx,
&workflowservice.ListWorkflowExecutionsRequest{
Namespace: namespace,
Expand Down Expand Up @@ -116,7 +116,7 @@ func APIMain(args *APIServerParams) error {
flowHandler := NewFlowRequestHandler(tc, catalogConn)
defer flowHandler.Close()

err = KillExistingHeartbeatFlows(ctx, tc, args.TemporalNamespace)
err = killExistingHeartbeatFlows(ctx, tc, args.TemporalNamespace)
if err != nil {
return fmt.Errorf("unable to kill existing heartbeat flows: %w", err)
}
Expand Down

0 comments on commit 89bbbe8

Please sign in to comment.