Skip to content

Commit

Permalink
Merge branch 'main' into resync-qrep-mirror
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Nov 7, 2023
2 parents 60106c6 + 75519d9 commit 5c6872b
Show file tree
Hide file tree
Showing 50 changed files with 1,389 additions and 731 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
with:
submodules: recursive

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/customer-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
packages: write
steps:
- name: checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
with:
submodules: recursive

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/dev-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
packages: write
steps:
- name: checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
with:
submodules: recursive

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ jobs:
--health-retries 5
steps:
- name: checkout sources
uses: actions/checkout@v3
uses: actions/checkout@v4

- uses: actions/setup-go@v3
with:
go-version: ">=1.19.0"
go-version: ">=1.21.0"

- name: install gotestsum
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/golang-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
runs-on: ${{ matrix.runner }}
steps:
- name: checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
with:
submodules: recursive

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/rust-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
runs-on: ${{ matrix.runner }}
steps:
- name: checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
with:
submodules: recursive

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/stable-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
packages: write
steps:
- name: checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
with:
submodules: recursive

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ui-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
runs-on: ${{ matrix.runner }}
steps:
- name: checkout
uses: actions/checkout@v3
uses: actions/checkout@v4

- name: Install Node.js dependencies
working-directory: ui
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/ui-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
runs-on: ${{ matrix.runner }}
steps:
- name: checkout
uses: actions/checkout@v3
uses: actions/checkout@v4

- name: Install Node.js dependencies
working-directory: ui
Expand All @@ -34,3 +34,5 @@ jobs:
prettier: true
eslint_dir: ui
prettier_dir: ui
eslint_args: "--max-warnings 0"
eslint_extensions: js,ts,jsx,tsx
60 changes: 28 additions & 32 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func NewEventHubConnector(
return nil, err
}

hubManager := NewEventHubManager(ctx, defaultAzureCreds, config)
hubManager := NewEventHubManager(defaultAzureCreds, config)
metadataSchemaName := "peerdb_eventhub_metadata" // #nosec G101
pgMetadata, err := metadataStore.NewPostgresMetadataStore(ctx, config.GetMetadataDb(),
metadataSchemaName)
Expand Down Expand Up @@ -124,7 +124,6 @@ func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error
func (c *EventHubConnector) processBatch(
flowJobName string,
batch *model.CDCRecordStream,
eventsPerBatch int,
maxParallelism int64,
) (uint32, error) {
ctx := context.Background()
Expand All @@ -133,6 +132,18 @@ func (c *EventHubConnector) processBatch(
batchPerTopic := NewHubBatches(c.hubManager)
toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns)

flushBatch := func() error {
err := c.sendEventBatch(ctx, batchPerTopic, maxParallelism, flowJobName, tableNameRowsMapping)
if err != nil {
log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("failed to send event batch: %v", err)
return err
}
batchPerTopic.Clear()
return nil
}

numRecords := 0
for record := range batch.GetRecords() {
numRecords++
Expand All @@ -144,18 +155,6 @@ func (c *EventHubConnector) processBatch(
return 0, err
}

flushBatch := func() error {
err := c.sendEventBatch(ctx, batchPerTopic, maxParallelism, flowJobName, tableNameRowsMapping)
if err != nil {
log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("failed to send event batch: %v", err)
return err
}
batchPerTopic.Clear()
return nil
}

topicName, err := NewScopedEventhub(record.GetTableName())
if err != nil {
log.WithFields(log.Fields{
Expand All @@ -172,19 +171,20 @@ func (c *EventHubConnector) processBatch(
return 0, err
}

if (numRecords)%eventsPerBatch == 0 {
err := flushBatch()
if err != nil {
return 0, err
}
if numRecords%1000 == 0 {
log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("processed %d records for sending", numRecords)
}
}

if batchPerTopic.Len() > 0 {
err := c.sendEventBatch(ctx, batchPerTopic, maxParallelism, flowJobName, tableNameRowsMapping)
if err != nil {
return 0, err
}
log.WithFields(log.Fields{
"flowName": flowJobName,
}).Infof("processed %d records for sending", numRecords)

err := flushBatch()
if err != nil {
return 0, err
}

log.WithFields(log.Fields{
Expand All @@ -203,10 +203,6 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
shutdown <- true
}()

eventsPerBatch := int(req.PushBatchSize)
if eventsPerBatch <= 0 {
eventsPerBatch = 10000
}
maxParallelism := req.PushParallelism
if maxParallelism <= 0 {
maxParallelism = 10
Expand All @@ -221,13 +217,13 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
// otherwise, we block until processBatch is done.
if utils.GetEnvBool("PEERDB_BETA_EVENTHUB_PUSH_ASYNC", false) {
go func() {
numRecords, err = c.processBatch(req.FlowJobName, batch, eventsPerBatch, maxParallelism)
numRecords, err = c.processBatch(req.FlowJobName, batch, maxParallelism)
if err != nil {
log.Errorf("[async] failed to process batch: %v", err)
}
}()
} else {
numRecords, err = c.processBatch(req.FlowJobName, batch, eventsPerBatch, maxParallelism)
numRecords, err = c.processBatch(req.FlowJobName, batch, maxParallelism)
if err != nil {
log.Errorf("failed to process batch: %v", err)
return nil, err
Expand Down Expand Up @@ -316,7 +312,7 @@ func (c *EventHubConnector) sendEventBatch(
return firstErr
}

log.Infof("successfully sent %d events to event hub", numEventsPushed)
log.Infof("[sendEventBatch] successfully sent %d events to event hub", numEventsPushed)
return nil
}

Expand All @@ -328,7 +324,7 @@ func (c *EventHubConnector) sendBatch(
subCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()

hub, err := c.hubManager.GetOrCreateHubClient(tblName)
hub, err := c.hubManager.GetOrCreateHubClient(subCtx, tblName)
if err != nil {
return err
}
Expand Down
34 changes: 27 additions & 7 deletions flow/connectors/eventhub/hubmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strings"
"sync"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
Expand All @@ -24,7 +25,6 @@ type EventHubManager struct {
}

func NewEventHubManager(
ctx context.Context,
creds *azidentity.DefaultAzureCredential,
groupConfig *protos.EventHubGroupConfig,
) *EventHubManager {
Expand All @@ -40,7 +40,8 @@ func NewEventHubManager(
}
}

func (m *EventHubManager) GetOrCreateHubClient(name ScopedEventhub) (*azeventhubs.ProducerClient, error) {
func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedEventhub) (
*azeventhubs.ProducerClient, error) {
ehConfig, ok := m.peerConfig.Get(name.PeerName)
if !ok {
return nil, fmt.Errorf("eventhub '%s' not configured", name)
Expand All @@ -53,9 +54,27 @@ func (m *EventHubManager) GetOrCreateHubClient(name ScopedEventhub) (*azeventhub
namespace = fmt.Sprintf("%s.servicebus.windows.net", namespace)
}

hub, ok := m.hubs.Load(name)
if !ok {
opts := &azeventhubs.ProducerClientOptions{}
var hubConnectOK bool
var hub any
hub, hubConnectOK = m.hubs.Load(name)
if hubConnectOK {
hubTmp := hub.(*azeventhubs.ProducerClient)
_, err := hubTmp.GetEventHubProperties(ctx, nil)
if err != nil {
log.Infof("eventhub %s not reachable. Will re-establish connection and re-create it. Err: %v", name, err)
m.hubs.Delete(name)
hubConnectOK = false
}
}

if !hubConnectOK {
opts := &azeventhubs.ProducerClientOptions{
RetryOptions: azeventhubs.RetryOptions{
MaxRetries: 32,
RetryDelay: 2 * time.Second,
MaxRetryDelay: 16 * time.Second,
},
}
hub, err := azeventhubs.NewProducerClient(namespace, name.Eventhub, m.creds, opts)
if err != nil {
return nil, fmt.Errorf("failed to create eventhub client: %v", err)
Expand All @@ -67,8 +86,9 @@ func (m *EventHubManager) GetOrCreateHubClient(name ScopedEventhub) (*azeventhub
return hub.(*azeventhubs.ProducerClient), nil
}

func (m *EventHubManager) CreateEventDataBatch(ctx context.Context, name ScopedEventhub) (*azeventhubs.EventDataBatch, error) {
hub, err := m.GetOrCreateHubClient(name)
func (m *EventHubManager) CreateEventDataBatch(ctx context.Context, name ScopedEventhub) (
*azeventhubs.EventDataBatch, error) {
hub, err := m.GetOrCreateHubClient(ctx, name)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func (p *PostgresCDCSource) consumeStream(
records.SignalAsEmpty()
}
records.RelationMessageMapping <- &p.relationMessageMapping
log.Infof("[finished] PullRecords streamed %d records", len(localRecords))
}()

shutdown := utils.HeartbeatRoutine(p.ctx, 10*time.Second, func() string {
Expand Down
1 change: 1 addition & 0 deletions ui/app/dto/MirrorsDTO.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ export type TableMapRow = {
source: string;
destination: string;
partitionKey: string;
selected: boolean;
};
13 changes: 13 additions & 0 deletions ui/app/globalstate/time.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { create } from 'zustand';

interface TZState {
timezone: string;
setZone: (tz: string) => void;
}

const useTZStore = create<TZState>()((set) => ({
timezone: 'UTC',
setZone: (tz) => set(() => ({ timezone: tz })),
}));

export default useTZStore;
Loading

0 comments on commit 5c6872b

Please sign in to comment.