From 4f0fd06fc4269fa60e533374234b001e9d7a73c0 Mon Sep 17 00:00:00 2001
From: Amogh-Bharadwaj <amogh@peerdb.io>
Date: Wed, 14 Feb 2024 01:15:16 +0530
Subject: [PATCH] null out invalid timestamp in pull side

---
 flow/connectors/postgres/cdc.go | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go
index c99bf40a79..372fb68129 100644
--- a/flow/connectors/postgres/cdc.go
+++ b/flow/connectors/postgres/cdc.go
@@ -719,6 +719,14 @@ func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32, forma
 			parsedData, err = dt.Codec.DecodeValue(p.typeMap, dataType, formatCode, data)
 		}
 		if err != nil {
+			if dt.Name == "time" || dt.Name == "timetz" ||
+				dt.Name == "timestamp" || dt.Name == "timestamptz" {
+				p.logger.Info(fmt.Sprintf("Invalidated and hence nulled %s data: %s",
+					dt.Name, string(data)))
+				// indicates year is more than 4 digits,
+				// or a malformed time string in general
+				return qvalue.QValue{}, nil
+			}
 			return qvalue.QValue{}, err
 		}
 		retVal, err := parseFieldFromPostgresOID(dataType, parsedData)