Skip to content

Commit

Permalink
Merge branch 'main' into normalize-split
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 25, 2024
2 parents 7daabd0 + 251eaad commit b70abbb
Show file tree
Hide file tree
Showing 26 changed files with 159 additions and 190 deletions.
27 changes: 13 additions & 14 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
numRecords := res.NumRecordsSynced
syncDuration := time.Since(syncStartTime)

slog.InfoContext(ctx, fmt.Sprintf("pushed %d records in %d seconds\n",
numRecords, int(syncDuration.Seconds())),
)
slog.InfoContext(ctx, fmt.Sprintf("pushed %d records in %d seconds", numRecords, int(syncDuration.Seconds())))

lastCheckpoint, err := recordBatch.GetLastCheckpoint()
if err != nil {
Expand Down Expand Up @@ -421,7 +419,7 @@ func (a *FlowableActivity) StartNormalize(
}

// log the number of batches normalized
slog.InfoContext(ctx, fmt.Sprintf("normalized records from batch %d to batch %d\n",
slog.InfoContext(ctx, fmt.Sprintf("normalized records from batch %d to batch %d",
res.StartBatchID, res.EndBatchID))

return res, nil
Expand Down Expand Up @@ -499,11 +497,11 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context,

numPartitions := len(partitions.Partitions)

slog.InfoContext(ctx, fmt.Sprintf("replicating partitions for batch %d - size: %d\n",
slog.InfoContext(ctx, fmt.Sprintf("replicating partitions for batch %d - size: %d",
partitions.BatchId, numPartitions),
)
for i, p := range partitions.Partitions {
slog.InfoContext(ctx, fmt.Sprintf("batch-%d - replicating partition - %s\n", partitions.BatchId, p.PartitionId))
slog.InfoContext(ctx, fmt.Sprintf("batch-%d - replicating partition - %s", partitions.BatchId, p.PartitionId))
err := a.replicateQRepPartition(ctx, config, i+1, numPartitions, p, runUUID)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
Expand Down Expand Up @@ -545,7 +543,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}
defer connectors.CloseConnector(dstConn)

slog.InfoContext(ctx, fmt.Sprintf("replicating partition %s\n", partition.PartitionId))
slog.InfoContext(ctx, fmt.Sprintf("replicating partition %s", partition.PartitionId))
shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total)
})
Expand Down Expand Up @@ -584,7 +582,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to pull qrep records: %w", err)
}
slog.InfoContext(ctx, fmt.Sprintf("pulled %d records\n", len(recordBatch.Records)))
slog.InfoContext(ctx, fmt.Sprintf("pulled %d records", len(recordBatch.Records)))

err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, a.CatalogPool, runUUID, partition, int64(len(recordBatch.Records)))
if err != nil {
Expand All @@ -605,7 +603,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}

if rowsSynced == 0 {
slog.InfoContext(ctx, fmt.Sprintf("no records to push for partition %s\n", partition.PartitionId))
slog.InfoContext(ctx, fmt.Sprintf("no records to push for partition %s", partition.PartitionId))
pullCancel()
} else {
wg.Wait()
Expand All @@ -619,7 +617,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
return err
}

slog.InfoContext(ctx, fmt.Sprintf("pushed %d records\n", rowsSynced))
slog.InfoContext(ctx, fmt.Sprintf("pushed %d records", rowsSynced))
}

err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition)
Expand Down Expand Up @@ -726,6 +724,7 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
sendTimeout := 10 * time.Minute
ticker := time.NewTicker(sendTimeout)
defer ticker.Stop()

activity.RecordHeartbeat(ctx, "sending walheartbeat every 10 minutes")
for {
select {
Expand Down Expand Up @@ -792,7 +791,7 @@ func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context,
}
defer connectors.CloseConnector(srcConn)
pgSrcConn := srcConn.(*connpostgres.PostgresConnector)
slog.InfoContext(ctx, fmt.Sprintf("current last partition value is %v\n", last))
slog.InfoContext(ctx, fmt.Sprintf("current last partition value is %v", last))
attemptCount := 1
for {
activity.RecordHeartbeat(ctx, fmt.Sprintf("no new rows yet, attempt #%d", attemptCount))
Expand Down Expand Up @@ -908,7 +907,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
}
defer connectors.CloseConnector(dstConn)

slog.InfoContext(ctx, "replicating xmin\n")
slog.InfoContext(ctx, "replicating xmin")

bufferSize := shared.FetchAndChannelSize
errGroup, errCtx := errgroup.WithContext(ctx)
Expand Down Expand Up @@ -974,7 +973,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
}

if rowsSynced == 0 {
slog.InfoContext(ctx, "no records to push for xmin\n")
slog.InfoContext(ctx, "no records to push for xmin")
} else {
err := errGroup.Wait()
if err != nil {
Expand All @@ -987,7 +986,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
return 0, err
}

slog.InfoContext(ctx, fmt.Sprintf("pushed %d records\n", rowsSynced))
slog.InfoContext(ctx, fmt.Sprintf("pushed %d records", rowsSynced))
}

err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition)
Expand Down
2 changes: 1 addition & 1 deletion flow/activities/slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ func (a *FlowableActivity) recordSlotSizePeriodically(

timeout := 5 * time.Minute
ticker := time.NewTicker(timeout)

defer ticker.Stop()

for {
select {
case <-ticker.C:
Expand Down
5 changes: 4 additions & 1 deletion flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ func (c *EventHubConnector) processBatch(
c.logger.Info("processBatch", slog.Int("number of records processed for sending", int(curNumRecords)))
}

case <-c.ctx.Done():
return 0, fmt.Errorf("[eventhub] context cancelled %w", c.ctx.Err())

case <-ticker.C:
err := batchPerTopic.flushAllBatches(ctx, flowJobName)
if err != nil {
Expand All @@ -213,7 +216,7 @@ func (c *EventHubConnector) processBatch(
lastUpdatedOffset = lastSeenLSN
c.logger.Info("processBatch", slog.Int64("updated last offset", lastSeenLSN))
if err != nil {
return 0, fmt.Errorf("failed to update last offset: %v", err)
return 0, fmt.Errorf("failed to update last offset: %w", err)
}
}

Expand Down
9 changes: 8 additions & 1 deletion flow/connectors/eventhub/hubmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log/slog"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
Expand Down Expand Up @@ -103,8 +104,13 @@ func (m *EventHubManager) closeProducerClient(ctx context.Context, pc *azeventhu
}

func (m *EventHubManager) Close(ctx context.Context) error {
var allErrors error
numHubsClosed := atomic.Uint32{}
shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("closed %d eventhub clients", numHubsClosed.Load())
})
defer shutdown()

var allErrors error
m.hubs.Range(func(key any, value any) bool {
name := key.(ScopedEventhub)
hub := value.(*azeventhubs.ProducerClient)
Expand All @@ -113,6 +119,7 @@ func (m *EventHubManager) Close(ctx context.Context) error {
slog.Error(fmt.Sprintf("failed to close eventhub client for %v", name), slog.Any("error", err))
allErrors = errors.Join(allErrors, err)
}
numHubsClosed.Add(1)
return true
})

Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,7 @@ func (p *PostgresCDCSource) recToTablePKey(req *model.PullRecordsRequest,
if err != nil {
return nil, fmt.Errorf("error getting pkey column value: %w", err)
}
pkeyColsMerged = append(pkeyColsMerged, []byte(fmt.Sprintf("%v", pkeyColVal.Value))...)
pkeyColsMerged = append(pkeyColsMerged, []byte(fmt.Sprint(pkeyColVal.Value))...)
}

return &model.TableWithPkey{
Expand Down
33 changes: 19 additions & 14 deletions flow/connectors/utils/avro/avro_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"io"
"log"
"log/slog"
"os"
"sync/atomic"
Expand Down Expand Up @@ -185,23 +184,23 @@ func (p *peerDBOCFWriter) WriteOCF(w io.Writer) (int, error) {
}

func (p *peerDBOCFWriter) WriteRecordsToS3(bucketName, key string, s3Creds utils.S3PeerCredentials) (*AvroFile, error) {
r, w := io.Pipe()
numRowsWritten := make(chan int, 1)
go func() {
defer w.Close()
numRows, err := p.WriteOCF(w)
if err != nil {
log.Fatalf("%v", err)
}
numRowsWritten <- numRows
}()

s3svc, err := utils.CreateS3Client(s3Creds)
if err != nil {
slog.Error("failed to create S3 client: ", slog.Any("error", err))
return nil, fmt.Errorf("failed to create S3 client: %w", err)
}

r, w := io.Pipe()
defer r.Close()
var writeOcfError error
var numRows int
var noPanic bool
go func() {
defer w.Close()
numRows, writeOcfError = p.WriteOCF(w)
noPanic = true
}()

_, err = manager.NewUploader(s3svc).Upload(p.ctx, &s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
Expand All @@ -212,11 +211,17 @@ func (p *peerDBOCFWriter) WriteRecordsToS3(bucketName, key string, s3Creds utils
slog.Error("failed to upload file: ", slog.Any("error", err), slog.Any("s3_path", s3Path))
return nil, fmt.Errorf("failed to upload file to path %s: %w", s3Path, err)
}

slog.Info("file uploaded to " + fmt.Sprintf("%s/%s", bucketName, key))

if !noPanic {
return nil, fmt.Errorf("WriteOCF panicked while writing avro to S3 %s/%s", bucketName, key)
}
if writeOcfError != nil {
return nil, writeOcfError
}

return &AvroFile{
NumRecords: <-numRowsWritten,
NumRecords: numRows,
StorageLocation: AvroS3Storage,
FilePath: key,
}, nil
Expand Down
6 changes: 5 additions & 1 deletion flow/connectors/utils/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ func HeartbeatRoutine(
shutdown := make(chan struct{})
go func() {
counter := 0
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()

for {
counter += 1
msg := fmt.Sprintf("heartbeat #%d: %s", counter, message())
Expand All @@ -25,7 +28,8 @@ func HeartbeatRoutine(
return
case <-ctx.Done():
return
case <-time.After(15 * time.Second):
case <-ticker.C:
ticker.Reset(15 * time.Second)
}
}
}()
Expand Down
7 changes: 1 addition & 6 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,12 +518,7 @@ func NewTStructuredLogger(logger slog.Logger) *TStructuredLogger {
}

func (l *TStructuredLogger) keyvalsToFields(keyvals []interface{}) slog.Attr {
var attrs []any
for i := 0; i < len(keyvals); i += 1 {
key := fmt.Sprintf("%v", keyvals[i])
attrs = append(attrs, key)
}
return slog.Group("test-log", attrs...)
return slog.Group("test-log", keyvals...)
}

func (l *TStructuredLogger) Debug(msg string, keyvals ...interface{}) {
Expand Down
8 changes: 2 additions & 6 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,13 +235,9 @@ func (q *QRepFlowExecution) processPartitions(
chunkSize = 1
}

batches := make([][]*protos.QRepPartition, 0)
batches := make([][]*protos.QRepPartition, 0, len(partitions)/chunkSize+1)
for i := 0; i < len(partitions); i += chunkSize {
end := i + chunkSize
if end > len(partitions) {
end = len(partitions)
}

end := min(i+chunkSize, len(partitions))
batches = append(batches, partitions[i:end])
}

Expand Down
Loading

0 comments on commit b70abbb

Please sign in to comment.