From 803fcc54d333ff43bbff1fed3f3206425acba36b Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Thu, 18 Apr 2024 00:50:11 +0530 Subject: [PATCH 1/3] experiment: stop workflow erroring on ES load failure --- flow/connectors/connelasticsearch/elasticsearch.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/flow/connectors/connelasticsearch/elasticsearch.go b/flow/connectors/connelasticsearch/elasticsearch.go index cd68f49879..5217c33aff 100644 --- a/flow/connectors/connelasticsearch/elasticsearch.go +++ b/flow/connectors/connelasticsearch/elasticsearch.go @@ -144,7 +144,7 @@ func (esc *ElasticsearchConnector) SyncQRepRecords(ctx context.Context, config * qRecordJsonMap[field.Name] = json.RawMessage(shared. UnsafeFastStringToReadOnlyBytes(r.Val)) default: - qRecordJsonMap[field.Name] = qRecord[i].Value() + qRecordJsonMap[field.Name] = r.Value() } } qRecordJsonBytes, err := json.Marshal(qRecordJsonMap) @@ -192,7 +192,6 @@ func (esc *ElasticsearchConnector) SyncQRepRecords(ctx context.Context, config * bulkIndexerHasShutdown = true if len(bulkIndexErrors) > 0 { esc.logger.Error("[es] failed to bulk index records", slog.Any("errors", bulkIndexErrors)) - return 0, fmt.Errorf("[es] failed to bulk index records: %v", bulkIndexErrors) } err = esc.FinishQRepPartition(ctx, partition, config.FlowJobName, startTime) From cb1a7deb0d8f0bae536b80ac20cf331fd2ca6dc8 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Thu, 18 Apr 2024 02:15:25 +0530 Subject: [PATCH 2/3] experiment: fix logs --- flow/connectors/connelasticsearch/elasticsearch.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/flow/connectors/connelasticsearch/elasticsearch.go b/flow/connectors/connelasticsearch/elasticsearch.go index 5217c33aff..db324ea0f5 100644 --- a/flow/connectors/connelasticsearch/elasticsearch.go +++ b/flow/connectors/connelasticsearch/elasticsearch.go @@ -168,7 +168,8 @@ func (esc *ElasticsearchConnector) SyncQRepRecords(ctx context.Context, config * bulkIndexErrors = append(bulkIndexErrors, err) } else { bulkIndexErrors = append(bulkIndexErrors, - fmt.Errorf("%s %s %v", res.Error.Type, res.Error.Reason, res.Error.Cause)) + fmt.Errorf("type:%s reason:%s caused by:(%v)", res.Error.Type, + res.Error.Reason, res.Error.Cause)) } }, }) @@ -191,7 +192,9 @@ func (esc *ElasticsearchConnector) SyncQRepRecords(ctx context.Context, config * } bulkIndexerHasShutdown = true if len(bulkIndexErrors) > 0 { - esc.logger.Error("[es] failed to bulk index records", slog.Any("errors", bulkIndexErrors)) + for _, err := range bulkIndexErrors { + esc.logger.Error("[es] failed to index record", slog.Any("err", err)) + } } err = esc.FinishQRepPartition(ctx, partition, config.FlowJobName, startTime) From 64c62a8af604896273bcc83a3940c6025100784d Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Mon, 22 Apr 2024 20:23:39 +0530 Subject: [PATCH 3/3] explicit logging id --- flow/connectors/connelasticsearch/elasticsearch.go | 10 +++++++--- flow/connectors/postgres/qrep.go | 5 +---- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/flow/connectors/connelasticsearch/elasticsearch.go b/flow/connectors/connelasticsearch/elasticsearch.go index db324ea0f5..01fa27d7e7 100644 --- a/flow/connectors/connelasticsearch/elasticsearch.go +++ b/flow/connectors/connelasticsearch/elasticsearch.go @@ -159,7 +159,7 @@ func (esc *ElasticsearchConnector) SyncQRepRecords(ctx context.Context, config * Body: bytes.NewReader(qRecordJsonBytes), // OnFailure is called for each failed operation, log and let parent handle - OnFailure: func(ctx context.Context, _ esutil.BulkIndexerItem, + OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error, ) { bulkIndexMutex.Lock() @@ -167,9 +167,13 @@ func (esc *ElasticsearchConnector) SyncQRepRecords(ctx context.Context, config * if err != nil { bulkIndexErrors = append(bulkIndexErrors, err) } else { + causeString := "" + if res.Error.Cause.Type != "" || res.Error.Cause.Reason != "" { + causeString = fmt.Sprintf("(caused by type:%s reason:%s)", res.Error.Cause.Type, res.Error.Cause.Reason) + } bulkIndexErrors = append(bulkIndexErrors, - fmt.Errorf("type:%s reason:%s caused by:(%v)", res.Error.Type, - res.Error.Reason, res.Error.Cause)) + fmt.Errorf("id:%s type:%s reason:%s %s", item.DocumentID, res.Error.Type, + res.Error.Reason, causeString)) } }, }) diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index 75d78188e5..8ec936d2ea 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -123,10 +123,7 @@ func (c *PostgresConnector) getNumRowsPartitions( } // Calculate the number of partitions - numPartitions := totalRows.Int64 / numRowsPerPartition - if totalRows.Int64%numRowsPerPartition != 0 { - numPartitions++ - } + numPartitions := shared.DivCeil(totalRows.Int64, numRowsPerPartition) c.logger.Info(fmt.Sprintf("total rows: %d, num partitions: %d, num rows per partition: %d", totalRows.Int64, numPartitions, numRowsPerPartition))