From e93aafcf36733f0ebf22f20e43f4cec6989951b7 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 25 Aug 2023 23:16:48 +0530 Subject: [PATCH 1/5] adds docker release for test branch --- .github/workflows/test-docker.yml | 73 +++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 .github/workflows/test-docker.yml diff --git a/.github/workflows/test-docker.yml b/.github/workflows/test-docker.yml new file mode 100644 index 0000000000..70e9472fd0 --- /dev/null +++ b/.github/workflows/test-docker.yml @@ -0,0 +1,73 @@ +name: Test Docker Images + +on: + push: + branches: [main] + pull_request: + branches: [main] + +jobs: + docker-build: + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + steps: + - name: checkout + uses: actions/checkout@v3 + with: + submodules: recursive + token: ${{ secrets.SUBMODULE_CHECKOUT }} + + - uses: depot/setup-action@v1 + + - name: Login to GitHub Container Registry + uses: docker/login-action@v2.1.0 + with: + registry: ghcr.io + username: ${{github.actor}} + password: ${{secrets.GITHUB_TOKEN}} + + - name: Set Short Commit Hash + id: vars + run: echo "::set-output name=sha_short::$(git rev-parse --short HEAD)" + + - name: Build (optionally publish) PeerDB Test Image + uses: depot/build-push-action@v1 + with: + token: ${{ secrets.DEPOT_TOKEN }} + context: . + file: stacks/nexus.Dockerfile + push: ${{ github.ref == 'refs/heads/test' }} + tags: | + ghcr.io/peerdb-io/peerdb-server:test-${{ steps.vars.outputs.sha_short }} + + - name: Build (optionally publish) Flow API Test Image + uses: depot/build-push-action@v1 + with: + token: ${{ secrets.DEPOT_TOKEN }} + context: . + file: stacks/flow-api.Dockerfile + push: ${{ github.ref == 'refs/heads/test' }} + tags: | + ghcr.io/peerdb-io/flow-api:test-${{ steps.vars.outputs.sha_short }} + + - name: Build (optionally publish) Flow Worker Test Image + uses: depot/build-push-action@v1 + with: + token: ${{ secrets.DEPOT_TOKEN }} + context: . + file: stacks/flow-worker.Dockerfile + push: ${{ github.ref == 'refs/heads/test' }} + tags: | + ghcr.io/peerdb-io/flow-worker:test-${{ steps.vars.outputs.sha_short }} + + - name: Build (optionally publish) Flow Snapshot Worker Test Image + uses: depot/build-push-action@v1 + with: + token: ${{ secrets.DEPOT_TOKEN }} + context: . + file: stacks/flow-snapshot-worker.Dockerfile + push: ${{ github.ref == 'refs/heads/test' }} + tags: | + ghcr.io/peerdb-io/flow-snapshot-worker:test-${{ steps.vars.outputs.sha_short }} From 5c854a85a995296e1474973a616104cf29175548 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 25 Aug 2023 23:20:02 +0530 Subject: [PATCH 2/5] change triggers in docker --- .github/workflows/test-docker.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test-docker.yml b/.github/workflows/test-docker.yml index 70e9472fd0..3299ba543d 100644 --- a/.github/workflows/test-docker.yml +++ b/.github/workflows/test-docker.yml @@ -2,9 +2,9 @@ name: Test Docker Images on: push: - branches: [main] + branches: [testing] pull_request: - branches: [main] + branches: [testing] jobs: docker-build: From 63cd748dde90cc30157d2e948a139740588f3545 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Fri, 25 Aug 2023 23:37:33 +0530 Subject: [PATCH 3/5] fix: branch name for docker --- .github/workflows/test-docker.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/test-docker.yml b/.github/workflows/test-docker.yml index 3299ba543d..36738ee8d1 100644 --- a/.github/workflows/test-docker.yml +++ b/.github/workflows/test-docker.yml @@ -38,7 +38,7 @@ jobs: token: ${{ secrets.DEPOT_TOKEN }} context: . file: stacks/nexus.Dockerfile - push: ${{ github.ref == 'refs/heads/test' }} + push: ${{ github.ref == 'refs/heads/testing' }} tags: | ghcr.io/peerdb-io/peerdb-server:test-${{ steps.vars.outputs.sha_short }} @@ -48,7 +48,7 @@ jobs: token: ${{ secrets.DEPOT_TOKEN }} context: . file: stacks/flow-api.Dockerfile - push: ${{ github.ref == 'refs/heads/test' }} + push: ${{ github.ref == 'refs/heads/testing' }} tags: | ghcr.io/peerdb-io/flow-api:test-${{ steps.vars.outputs.sha_short }} @@ -58,7 +58,7 @@ jobs: token: ${{ secrets.DEPOT_TOKEN }} context: . file: stacks/flow-worker.Dockerfile - push: ${{ github.ref == 'refs/heads/test' }} + push: ${{ github.ref == 'refs/heads/testing' }} tags: | ghcr.io/peerdb-io/flow-worker:test-${{ steps.vars.outputs.sha_short }} @@ -68,6 +68,6 @@ jobs: token: ${{ secrets.DEPOT_TOKEN }} context: . file: stacks/flow-snapshot-worker.Dockerfile - push: ${{ github.ref == 'refs/heads/test' }} + push: ${{ github.ref == 'refs/heads/testing' }} tags: | ghcr.io/peerdb-io/flow-snapshot-worker:test-${{ steps.vars.outputs.sha_short }} From 0a0c3ed81fb5dcc0ac034afba3aa43d362d456de Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Tue, 5 Sep 2023 00:14:34 +0530 Subject: [PATCH 4/5] adds logs for eh --- flow/connectors/eventhub/eventhub.go | 49 +++++++++++++++++++++------- flow/connectors/eventhub/metadata.go | 8 +++-- 2 files changed, 44 insertions(+), 13 deletions(-) diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index c4fba1e2b0..e6e1e050ff 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -104,8 +104,16 @@ func (c *EventHubConnector) PullRecords(req *model.PullRecordsRequest) (*model.R } func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) { - batch := req.Records + shutdown := utils.HeartbeatRoutine(c.ctx, 1*time.Minute, func() string { + return fmt.Sprintf("syncing records to eventhub with"+ + " push parallelism %d and push batch size %d", + req.PushParallelism, req.PushBatchSize) + }) + defer func() { + shutdown <- true + }() + batch := req.Records eventsPerHeartBeat := 1000 eventsPerBatch := int(req.PushBatchSize) maxParallelism := req.PushParallelism @@ -114,7 +122,9 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S for i, record := range batch.Records { json, err := record.GetItems().ToJSON() if err != nil { - log.Errorf("failed to convert record to json: %v", err) + log.WithFields(log.Fields{ + "flowName": req.FlowJobName, + }).Infof("failed to convert record to json: %v", err) return nil, err } @@ -132,7 +142,8 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S } if (i+1)%eventsPerBatch == 0 { - err := c.sendEventBatch(batchPerTopic, maxParallelism) + err := c.sendEventBatch(batchPerTopic, maxParallelism, + req.FlowJobName) if err != nil { return nil, err } @@ -143,13 +154,16 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S // send the remaining events. if len(batchPerTopic) > 0 { - err := c.sendEventBatch(batchPerTopic, maxParallelism) + err := c.sendEventBatch(batchPerTopic, maxParallelism, + req.FlowJobName) if err != nil { return nil, err } } - log.Infof("[total] successfully sent %d records to event hub", len(batch.Records)) + log.WithFields(log.Fields{ + "flowName": req.FlowJobName, + }).Infof("[total] successfully sent %d records to event hub", len(batch.Records)) err := c.UpdateLastOffset(req.FlowJobName, batch.LastCheckPointID) if err != nil { @@ -164,9 +178,13 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S }, nil } -func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event, maxParallelism int64) error { +func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event, + maxParallelism int64, + flowName string) error { if len(events) == 0 { - log.Info("no events to send") + log.WithFields(log.Fields{ + "flowName": flowName, + }).Infof("no events to send") return nil } @@ -194,7 +212,10 @@ func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event, once.Do(func() { firstErr = err }) return } - + log.WithFields(log.Fields{ + "flowName": flowName, + }).Infof("obtained hub connection and now sending %d events to event hub: %s", + len(eventBatch), tblName) err = hub.SendBatch(subCtx, eventhub.NewEventBatchIterator(eventBatch...)) if err != nil { once.Do(func() { firstErr = err }) @@ -202,6 +223,10 @@ func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event, } atomic.AddInt32(&numEventsPushed, int32(len(eventBatch))) + log.WithFields(log.Fields{ + "flowName": flowName, + }).Infof("pushed %d events to event hub: %s", + numEventsPushed, tblName) }(tblName, eventBatch) } @@ -237,7 +262,7 @@ func (c *EventHubConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr tableMap := req.GetTableNameMapping() for _, table := range tableMap { - err := c.ensureEventHub(c.ctx, table) + err := c.ensureEventHub(c.ctx, table, req.FlowJobName) if err != nil { log.WithFields(log.Fields{ "flowName": req.FlowJobName, @@ -255,7 +280,7 @@ func (c *EventHubConnector) GetTableSchema( panic("get table schema not implemented for event hub") } -func (c *EventHubConnector) ensureEventHub(ctx context.Context, name string) error { +func (c *EventHubConnector) ensureEventHub(ctx context.Context, name string, flowName string) error { hubClient, err := c.getEventHubMgmtClient() if err != nil { return err @@ -282,7 +307,9 @@ func (c *EventHubConnector) ensureEventHub(ctx context.Context, name string) err return err } - log.Infof("event hub %s created", name) + log.WithFields(log.Fields{ + "flowName": flowName, + }).Infof("event hub %s created", name) } else { log.Infof("event hub %s already exists", name) } diff --git a/flow/connectors/eventhub/metadata.go b/flow/connectors/eventhub/metadata.go index 84175b0d27..07bf30b339 100644 --- a/flow/connectors/eventhub/metadata.go +++ b/flow/connectors/eventhub/metadata.go @@ -125,7 +125,9 @@ func (c *EventHubConnector) GetLastOffset(jobName string) (*protos.LastSyncState }, nil } - log.Errorf("failed to get last offset: %v", err) + log.WithFields(log.Fields{ + "flowName": jobName, + }).Errorf("failed to get last offset: %v", err) return nil, err } @@ -155,7 +157,9 @@ func (c *EventHubConnector) UpdateLastOffset(jobName string, offset int64) error } // update the last offset - log.Infof("updating last offset for job `%s` to `%d`", jobName, offset) + log.WithFields(log.Fields{ + "flowName": jobName, + }).Infof("updating last offset for job `%s` to `%d`", jobName, offset) _, err = tx.Exec(c.ctx, ` INSERT INTO `+metadataSchema+`.`+lastSyncStateTableName+` (job_name, last_offset) VALUES ($1, $2) From 4ef7828ec1a9229c6511c52fcca93fa2c3d0eb1c Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Tue, 5 Sep 2023 20:31:52 +0530 Subject: [PATCH 5/5] removes docker file --- .github/workflows/test-docker.yml | 73 ------------------------------- 1 file changed, 73 deletions(-) delete mode 100644 .github/workflows/test-docker.yml diff --git a/.github/workflows/test-docker.yml b/.github/workflows/test-docker.yml deleted file mode 100644 index 36738ee8d1..0000000000 --- a/.github/workflows/test-docker.yml +++ /dev/null @@ -1,73 +0,0 @@ -name: Test Docker Images - -on: - push: - branches: [testing] - pull_request: - branches: [testing] - -jobs: - docker-build: - runs-on: ubuntu-latest - permissions: - contents: read - packages: write - steps: - - name: checkout - uses: actions/checkout@v3 - with: - submodules: recursive - token: ${{ secrets.SUBMODULE_CHECKOUT }} - - - uses: depot/setup-action@v1 - - - name: Login to GitHub Container Registry - uses: docker/login-action@v2.1.0 - with: - registry: ghcr.io - username: ${{github.actor}} - password: ${{secrets.GITHUB_TOKEN}} - - - name: Set Short Commit Hash - id: vars - run: echo "::set-output name=sha_short::$(git rev-parse --short HEAD)" - - - name: Build (optionally publish) PeerDB Test Image - uses: depot/build-push-action@v1 - with: - token: ${{ secrets.DEPOT_TOKEN }} - context: . - file: stacks/nexus.Dockerfile - push: ${{ github.ref == 'refs/heads/testing' }} - tags: | - ghcr.io/peerdb-io/peerdb-server:test-${{ steps.vars.outputs.sha_short }} - - - name: Build (optionally publish) Flow API Test Image - uses: depot/build-push-action@v1 - with: - token: ${{ secrets.DEPOT_TOKEN }} - context: . - file: stacks/flow-api.Dockerfile - push: ${{ github.ref == 'refs/heads/testing' }} - tags: | - ghcr.io/peerdb-io/flow-api:test-${{ steps.vars.outputs.sha_short }} - - - name: Build (optionally publish) Flow Worker Test Image - uses: depot/build-push-action@v1 - with: - token: ${{ secrets.DEPOT_TOKEN }} - context: . - file: stacks/flow-worker.Dockerfile - push: ${{ github.ref == 'refs/heads/testing' }} - tags: | - ghcr.io/peerdb-io/flow-worker:test-${{ steps.vars.outputs.sha_short }} - - - name: Build (optionally publish) Flow Snapshot Worker Test Image - uses: depot/build-push-action@v1 - with: - token: ${{ secrets.DEPOT_TOKEN }} - context: . - file: stacks/flow-snapshot-worker.Dockerfile - push: ${{ github.ref == 'refs/heads/testing' }} - tags: | - ghcr.io/peerdb-io/flow-snapshot-worker:test-${{ steps.vars.outputs.sha_short }}