From ef229c65973460e64d8707ccfed270dd66ee024a Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Sat, 13 Apr 2024 03:07:37 +0530 Subject: [PATCH] comment out kafka push --- flow/connectors/kafka/kafka.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/flow/connectors/kafka/kafka.go b/flow/connectors/kafka/kafka.go index bb36aee8ef..bbdc64e304 100644 --- a/flow/connectors/kafka/kafka.go +++ b/flow/connectors/kafka/kafka.go @@ -168,13 +168,13 @@ func lvalueToKafkaRecord(ls *lua.LState, value lua.LValue) (*kgo.Record, error) func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest[model.RecordItems]) (*model.SyncResponse, error) { var wg sync.WaitGroup - wgCtx, wgErr := context.WithCancelCause(ctx) - produceCb := func(r *kgo.Record, err error) { - if err != nil { - wgErr(err) - } - wg.Done() - } + wgCtx, _ := context.WithCancelCause(ctx) + // produceCb := func(r *kgo.Record, err error) { + // if err != nil { + // wgErr(err) + // } + // wg.Done() + // } numRecords := int64(0) tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings) @@ -259,8 +259,8 @@ Loop: kr.Topic = record.GetDestinationTableName() } - wg.Add(1) - c.client.Produce(wgCtx, kr, produceCb) + //wg.Add(1) + //c.client.Produce(wgCtx, kr, produceCb) record.PopulateCountMap(tableNameRowsMapping) } }