Skip to content

Commit

Permalink
comment out kafka push
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Apr 12, 2024
1 parent 2c05847 commit ef229c6
Showing 1 changed file with 9 additions and 9 deletions.
18 changes: 9 additions & 9 deletions flow/connectors/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,13 @@ func lvalueToKafkaRecord(ls *lua.LState, value lua.LValue) (*kgo.Record, error)

func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest[model.RecordItems]) (*model.SyncResponse, error) {
var wg sync.WaitGroup
wgCtx, wgErr := context.WithCancelCause(ctx)
produceCb := func(r *kgo.Record, err error) {
if err != nil {
wgErr(err)
}
wg.Done()
}
wgCtx, _ := context.WithCancelCause(ctx)
// produceCb := func(r *kgo.Record, err error) {
// if err != nil {
// wgErr(err)
// }
// wg.Done()
// }

numRecords := int64(0)
tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings)
Expand Down Expand Up @@ -259,8 +259,8 @@ Loop:
kr.Topic = record.GetDestinationTableName()
}

wg.Add(1)
c.client.Produce(wgCtx, kr, produceCb)
//wg.Add(1)
//c.client.Produce(wgCtx, kr, produceCb)
record.PopulateCountMap(tableNameRowsMapping)
}
}
Expand Down

0 comments on commit ef229c6

Please sign in to comment.