Skip to content

Commit

Permalink
Merge branch 'main' into wait-for-at-least-one
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Nov 17, 2023
2 parents 6638b31 + 0d12f09 commit e09f0cc
Show file tree
Hide file tree
Showing 28 changed files with 1,180 additions and 2,151 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ jobs:
json: ${{ secrets.GCS_CREDS }}

- name: create hstore extension and increase logical replication limits
run: |
run: >
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "CREATE EXTENSION hstore;"
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET wal_level=logical;"
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET max_replication_slots=100;"
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET max_wal_senders=100;"
docker exec pg_cdc psql -h localhost -p 5432 -U postgres -c "ALTER SYSTEM SET max_connections=1024;"
-c "ALTER SYSTEM SET wal_level=logical;"
-c "ALTER SYSTEM SET max_replication_slots=192;"
-c "ALTER SYSTEM SET max_wal_senders=256;"
-c "ALTER SYSTEM SET max_connections=2048;" &&
docker restart pg_cdc
working-directory: ./flow
env:
Expand All @@ -95,7 +95,7 @@ jobs:

- name: run tests
run: |
gotestsum --format testname -- -p 32 ./... -timeout 2400s
gotestsum --format testname -- -p 16 ./... -timeout 2400s
working-directory: ./flow
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
Expand Down
13 changes: 6 additions & 7 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
"flowName": input.FlowConnectionConfigs.FlowJobName,
}).Info("pulling records...")

tblNameMapping := make(map[string]string)
tblNameMapping := make(map[string]model.NameAndExclude)
for _, v := range input.FlowConnectionConfigs.TableMappings {
tblNameMapping[v.SourceTableIdentifier] = v.DestinationTableIdentifier
tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude)
}

idleTimeout := utils.GetEnvInt("PEERDB_CDC_IDLE_TIMEOUT_SECONDS", 10)
Expand Down Expand Up @@ -254,7 +254,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
log.WithFields(log.Fields{"flowName": input.FlowConnectionConfigs.FlowJobName}).Info("no records to push")
syncResponse := &model.SyncResponse{}
syncResponse.RelationMessageMapping = <-recordBatch.RelationMessageMapping
syncResponse.TableSchemaDeltas = recordBatch.WaitForSchemaDeltas()
syncResponse.TableSchemaDeltas = recordBatch.WaitForSchemaDeltas(input.FlowConnectionConfigs.TableMappings)
return syncResponse, nil
}

Expand Down Expand Up @@ -323,7 +323,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
if err != nil {
return nil, err
}
res.TableSchemaDeltas = recordBatch.WaitForSchemaDeltas()
res.TableSchemaDeltas = recordBatch.WaitForSchemaDeltas(input.FlowConnectionConfigs.TableMappings)
res.RelationMessageMapping = <-recordBatch.RelationMessageMapping

pushedRecordsWithCount := fmt.Sprintf("pushed %d records", numRecords)
Expand Down Expand Up @@ -523,7 +523,6 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
var stream *model.QRecordStream
bufferSize := shared.FetchAndChannelSize
var wg sync.WaitGroup
var numRecords int64

var goroutineErr error = nil
if config.SourcePeer.Type == protos.DBType_POSTGRES {
Expand All @@ -533,7 +532,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
pullPgRecords := func() {
pgConn := srcConn.(*connpostgres.PostgresConnector)
tmp, err := pgConn.PullQRepRecordStream(config, partition, stream)
numRecords = int64(tmp)
numRecords := int64(tmp)
if err != nil {
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
Expand All @@ -554,7 +553,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
if err != nil {
return fmt.Errorf("failed to pull records: %w", err)
}
numRecords = int64(recordBatch.NumRecords)
numRecords := int64(recordBatch.NumRecords)
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Infof("pulled %d records\n", len(recordBatch.Records))
Expand Down
118 changes: 9 additions & 109 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,13 @@ import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
azeventhubs "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
cmap "github.com/orcaman/concurrent-map/v2"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -127,23 +123,9 @@ func (c *EventHubConnector) processBatch(
maxParallelism int64,
) (uint32, error) {
ctx := context.Background()

tableNameRowsMapping := cmap.New[uint32]()
batchPerTopic := NewHubBatches(c.hubManager)
toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns)

flushBatch := func() error {
err := c.sendEventBatch(ctx, batchPerTopic, maxParallelism, flowJobName, tableNameRowsMapping)
if err != nil {
log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("failed to send event batch: %v", err)
return err
}
batchPerTopic.Clear()
return nil
}

numRecords := 0
for record := range batch.GetRecords() {
numRecords++
Expand Down Expand Up @@ -182,14 +164,15 @@ func (c *EventHubConnector) processBatch(
"flowName": flowJobName,
}).Infof("processed %d records for sending", numRecords)

err := flushBatch()
if err != nil {
return 0, err
flushErr := batchPerTopic.flushAllBatches(ctx, maxParallelism, flowJobName)
if flushErr != nil {
return 0, flushErr
}
batchPerTopic.Clear()

log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("[total] successfully sent %d records to event hub", numRecords)
}).Infof("[total] successfully flushed %d records to event hub", numRecords)
return uint32(numRecords), nil
}

Expand Down Expand Up @@ -256,101 +239,18 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
}, nil
}

func (c *EventHubConnector) sendEventBatch(
ctx context.Context,
events *HubBatches,
maxParallelism int64,
flowName string,
tableNameRowsMapping cmap.ConcurrentMap[string, uint32]) error {
if events.Len() == 0 {
log.WithFields(log.Fields{
"flowName": flowName,
}).Infof("no events to send")
return nil
}

var numEventsPushed int32
var wg sync.WaitGroup
var once sync.Once
var firstErr error
// Limiting concurrent sends
guard := make(chan struct{}, maxParallelism)

events.ForEach(func(tblName ScopedEventhub, eventBatch *azeventhubs.EventDataBatch) {
guard <- struct{}{}
wg.Add(1)
go func(tblName ScopedEventhub, eventBatch *azeventhubs.EventDataBatch) {
defer func() {
<-guard
wg.Done()
}()

numEvents := eventBatch.NumEvents()
err := c.sendBatch(ctx, tblName, eventBatch)
if err != nil {
once.Do(func() { firstErr = err })
return
}

atomic.AddInt32(&numEventsPushed, numEvents)
log.WithFields(log.Fields{
"flowName": flowName,
}).Infof("pushed %d events to event hub: %s", numEvents, tblName)
rowCount, ok := tableNameRowsMapping.Get(tblName.ToString())
if !ok {
rowCount = uint32(0)
}
rowCount += uint32(numEvents)
tableNameRowsMapping.Set(tblName.ToString(), rowCount)
}(tblName, eventBatch)
})

wg.Wait()

if firstErr != nil {
log.Error(firstErr)
return firstErr
}

log.Infof("[sendEventBatch] successfully sent %d events to event hub", numEventsPushed)
return nil
}

func (c *EventHubConnector) sendBatch(
ctx context.Context,
tblName ScopedEventhub,
events *azeventhubs.EventDataBatch,
) error {
subCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()

hub, err := c.hubManager.GetOrCreateHubClient(subCtx, tblName)
if err != nil {
return err
}

opts := &azeventhubs.SendEventDataBatchOptions{}
err = hub.SendEventDataBatch(subCtx, events, opts)
if err != nil {
return err
}

log.Infof("successfully sent %d events to event hub topic - %s", events.NumEvents(), tblName.ToString())
return nil
}

func (c *EventHubConnector) CreateRawTable(req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) {
// create topics for each table
// key is the source table and value is the "eh_peer.eh_topic" that ought to be used.
tableMap := req.GetTableNameMapping()

for _, table := range tableMap {
for _, destinationTable := range tableMap {
// parse peer name and topic name.
name, err := NewScopedEventhub(table)
name, err := NewScopedEventhub(destinationTable)
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
"table": table,
"table": destinationTable,
}).Errorf("failed to parse peer and topic name: %v", err)
return nil, err
}
Expand All @@ -359,7 +259,7 @@ func (c *EventHubConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr
if err != nil {
log.WithFields(log.Fields{
"flowName": req.FlowJobName,
"table": table,
"table": destinationTable,
}).Errorf("failed to ensure event hub exists: %v", err)
return nil, err
}
Expand Down
63 changes: 63 additions & 0 deletions flow/connectors/eventhub/hub_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ import (
"context"
"fmt"
"strings"
"sync/atomic"
"time"

azeventhubs "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)

// multimap from ScopedEventhub to *azeventhubs.EventDataBatch
Expand Down Expand Up @@ -79,6 +83,65 @@ func (h *HubBatches) ForEach(fn func(ScopedEventhub, *azeventhubs.EventDataBatch
}
}

func (h *HubBatches) sendBatch(
ctx context.Context,
tblName ScopedEventhub,
events *azeventhubs.EventDataBatch,
) error {
subCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()

hub, err := h.manager.GetOrCreateHubClient(subCtx, tblName)
if err != nil {
return err
}

opts := &azeventhubs.SendEventDataBatchOptions{}
err = hub.SendEventDataBatch(subCtx, events, opts)
if err != nil {
return err
}

log.Infof("successfully sent %d events to event hub topic - %s", events.NumEvents(), tblName.ToString())
return nil
}

func (h *HubBatches) flushAllBatches(
ctx context.Context,
maxParallelism int64,
flowName string,
) error {
if h.Len() == 0 {
log.WithFields(log.Fields{
"flowName": flowName,
}).Infof("no events to send")
return nil
}

var numEventsPushed int32
g, gCtx := errgroup.WithContext(ctx)
g.SetLimit(int(maxParallelism))
h.ForEach(func(tblName ScopedEventhub, eventBatch *azeventhubs.EventDataBatch) {
g.Go(func() error {
numEvents := eventBatch.NumEvents()
err := h.sendBatch(gCtx, tblName, eventBatch)
if err != nil {
return err
}

atomic.AddInt32(&numEventsPushed, numEvents)
log.WithFields(log.Fields{
"flowName": flowName,
}).Infof("pushed %d events to event hub: %s", numEvents, tblName)
return nil
})
})

log.Infof("[sendEventBatch] successfully sent %d events in total to event hub",
numEventsPushed)
return g.Wait()
}

// Clear removes all batches from the HubBatches
func (h *HubBatches) Clear() {
h.batches = make(map[ScopedEventhub][]*azeventhubs.EventDataBatch)
Expand Down
Loading

0 comments on commit e09f0cc

Please sign in to comment.