Skip to content

Commit

Permalink
undo generated file changes, add better logging
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Nov 24, 2023
1 parent 62ef43c commit b1c4256
Show file tree
Hide file tree
Showing 3 changed files with 1,350 additions and 65 deletions.
34 changes: 22 additions & 12 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,22 +663,24 @@ func (a *FlowableActivity) DropFlow(ctx context.Context, config *protos.Shutdown
return nil
}

func getPostgresPeerConfigs(ctx context.Context) ([]*protos.PostgresConfig, error) {
func getPostgresPeerConfigs(ctx context.Context) ([]*protos.Peer, error) {
var peerOptions sql.RawBytes
catalogPool, catalogErr := catalog.GetCatalogConnectionPoolFromEnv()
if catalogErr != nil {
return nil, fmt.Errorf("error getting catalog connection pool: %w", catalogErr)
}
defer catalogPool.Close()

optionRows, err := catalogPool.Query(ctx, "SELECT options FROM peers WHERE type=$1", protos.DBType_POSTGRES)
optionRows, err := catalogPool.Query(ctx,
"SELECT name, options FROM peers WHERE type=$1", protos.DBType_POSTGRES)
if err != nil {
return nil, err
}
defer optionRows.Close()
var peerConfigs []*protos.PostgresConfig
var peerName string
var postgresPeers []*protos.Peer
for optionRows.Next() {
err := optionRows.Scan(&peerOptions)
err := optionRows.Scan(&peerName, &peerOptions)
if err != nil {
return nil, err
}
Expand All @@ -687,17 +689,21 @@ func getPostgresPeerConfigs(ctx context.Context) ([]*protos.PostgresConfig, erro
if unmarshalErr != nil {
return nil, unmarshalErr
}
peerConfigs = append(peerConfigs, &pgPeerConfig)
postgresPeers = append(postgresPeers, &protos.Peer{
Name: peerName,
Type: protos.DBType_POSTGRES,
Config: &protos.Peer_PostgresConfig{PostgresConfig: &pgPeerConfig},
})
}
return peerConfigs, nil
return postgresPeers, nil
}

func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
sendTimeout := 10 * time.Minute
sendTimeout := 10 * time.Second
ticker := time.NewTicker(sendTimeout)
defer ticker.Stop()

pgConfigs, err := getPostgresPeerConfigs(ctx)
pgPeers, err := getPostgresPeerConfigs(ctx)
if err != nil {
return fmt.Errorf("error getting postgres peer configs: %w", err)
}
Expand All @@ -717,21 +723,25 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
END;
`
// run above command for each Postgres peer
for _, pgConfig := range pgConfigs {
for _, pgPeer := range pgPeers {
pgConfig := pgPeer.GetPostgresConfig()
peerConn, peerErr := pgx.Connect(ctx, utils.GetPGConnectionString(pgConfig))
if peerErr != nil {
return fmt.Errorf("error creating pool for postgres peer with host %v: %w", pgConfig.Host, peerErr)
return fmt.Errorf("error creating pool for postgres peer %v with host %v: %w",
pgPeer.Name, pgConfig.Host, peerErr)
}

_, err := peerConn.Exec(ctx, command)
if err != nil {
log.Warnf("warning: could not send walheartbeat to host %v: %v", pgConfig.Host, err)
log.Warnf("warning: could not send walheartbeat to peer %v: %v", pgPeer.Name, err)
}

closeErr := peerConn.Close(ctx)
if closeErr != nil {
return fmt.Errorf("error closing postgres connection for host %v: %w", pgConfig.Host, closeErr)
return fmt.Errorf("error closing postgres connection for peer %v with host %v: %w",
pgPeer.Name, pgConfig.Host, closeErr)
}
log.Infof("sent walheartbeat to peer %v", pgPeer.Name)
}
ticker.Stop()
ticker = time.NewTicker(sendTimeout)
Expand Down
Loading

0 comments on commit b1c4256

Please sign in to comment.