Skip to content

Commit 4eafbf4

Browse files
check for peer table existence
1 parent 8a17bde commit 4eafbf4

File tree

2 files changed

+14
-14
lines changed

2 files changed

+14
-14
lines changed

.github/workflows/flow.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ jobs:
9292
env:
9393
PG_CDC: empty
9494
PGPASSWORD: postgres
95-
95+
9696
- name: run tests
9797
run: |
9898
gotestsum --format testname -- -p 8 ./... -timeout 2400s

flow/activities/flowable.go

+13-13
Original file line numberDiff line numberDiff line change
@@ -706,24 +706,24 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
706706
sendTimeout := 10 * time.Minute
707707
ticker := time.NewTicker(sendTimeout)
708708
defer ticker.Stop()
709-
710-
peerTableExists := true
711-
pgPeers, err := getPostgresPeerConfigs(ctx)
712-
if err != nil {
713-
if strings.Contains(err.Error(), "does not exist") {
714-
log.Warn("[sendwalheartbeat]: warning: no postgres peers found in catalog. Now I will be sleeping repeatedly")
715-
peerTableExists = false
716-
}
717-
}
718-
719709
activity.RecordHeartbeat(ctx, "sending walheartbeat every 10 minutes")
720710
for {
721711
select {
722712
case <-ctx.Done():
723713
log.Info("context is done, exiting wal heartbeat send loop")
724714
return nil
725715
case <-ticker.C:
726-
if peerTableExists {
716+
peersTableExists := true
717+
pgPeers, err := getPostgresPeerConfigs(ctx)
718+
if err != nil {
719+
if strings.Contains(err.Error(), "does not exist") {
720+
log.Warn("[sendwalheartbeat]: warning: peers table not found. skipping walheartbeat send.")
721+
peersTableExists = false
722+
}
723+
return fmt.Errorf("error getting postgres peers: %w", err)
724+
}
725+
726+
if peersTableExists {
727727
command := `
728728
BEGIN;
729729
DROP aggregate IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4);
@@ -753,9 +753,9 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
753753
log.Infof("sent walheartbeat to peer %v", pgPeer.Name)
754754
}
755755
}
756-
ticker.Stop()
757-
ticker = time.NewTicker(sendTimeout)
758756
}
757+
ticker.Stop()
758+
ticker = time.NewTicker(sendTimeout)
759759
}
760760
}
761761

0 commit comments

Comments
 (0)